Merge branch 'master' into index-lifecycle

This commit is contained in:
Tal Levy 2018-06-21 09:56:45 -07:00
commit c9eec7eadc
74 changed files with 852 additions and 758 deletions

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api-assistance]]
=== Migration Assistance API

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api-deprecation]]
=== Deprecation Info APIs

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api-upgrade]]
=== Migration Upgrade API

View File

@ -1,4 +1,5 @@
[role="xpack"]
[testenv="basic"]
[[migration-api]]
== Migration APIs
@ -8,6 +9,6 @@ The migration APIs simplify upgrading {xpack} indices from one version to anothe
* <<migration-api-upgrade>>
* <<migration-api-deprecation>>
include::migration/assistance.asciidoc[]
include::migration/upgrade.asciidoc[]
include::migration/deprecation.asciidoc[]
include::apis/assistance.asciidoc[]
include::apis/upgrade.asciidoc[]
include::apis/deprecation.asciidoc[]

View File

@ -21,7 +21,7 @@ directly to configure and access {xpack} features.
include::info.asciidoc[]
include::{xes-repo-dir}/rest-api/graph/explore.asciidoc[]
include::{es-repo-dir}/licensing/index.asciidoc[]
include::{xes-repo-dir}/rest-api/migration.asciidoc[]
include::{es-repo-dir}/migration/migration.asciidoc[]
include::{xes-repo-dir}/rest-api/ml-api.asciidoc[]
include::{xes-repo-dir}/rest-api/rollup-api.asciidoc[]
include::{xes-repo-dir}/rest-api/security.asciidoc[]

View File

@ -21,6 +21,7 @@ package org.elasticsearch.nio;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -99,6 +100,11 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
Socket channel = createChannel(selector, rawChannel);
assert channel.getContext() != null : "channel context should have been set on channel";
return channel;
} catch (UncheckedIOException e) {
// This can happen if getRemoteAddress throws IOException.
IOException cause = e.getCause();
closeRawChannel(rawChannel, cause);
throw cause;
} catch (Exception e) {
closeRawChannel(rawChannel, e);
throw e;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.NetworkChannel;
import java.util.function.BiConsumer;
@ -32,20 +31,10 @@ import java.util.function.BiConsumer;
*/
public abstract class NioChannel {
private final InetSocketAddress localAddress;
NioChannel(NetworkChannel socketChannel) throws IOException {
this.localAddress = (InetSocketAddress) socketChannel.getLocalAddress();
}
public boolean isOpen() {
return getContext().isOpen();
}
public InetSocketAddress getLocalAddress() {
return localAddress;
}
/**
* Adds a close listener to the channel. Multiple close listeners can be added. There is no guarantee
* about the order in which close listeners will be executed. If the channel is already closed, the
@ -64,6 +53,8 @@ public abstract class NioChannel {
getContext().closeChannel();
}
public abstract InetSocketAddress getLocalAddress();
public abstract NetworkChannel getRawChannel();
public abstract ChannelContext<?> getContext();

View File

@ -19,19 +19,20 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
public class NioServerSocketChannel extends NioChannel {
private final ServerSocketChannel socketChannel;
private final ServerSocketChannel serverSocketChannel;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private volatile InetSocketAddress localAddress;
private ServerChannelContext context;
public NioServerSocketChannel(ServerSocketChannel socketChannel) throws IOException {
super(socketChannel);
this.socketChannel = socketChannel;
public NioServerSocketChannel(ServerSocketChannel serverSocketChannel) {
this.serverSocketChannel = serverSocketChannel;
attemptToSetLocalAddress();
}
/**
@ -48,9 +49,15 @@ public class NioServerSocketChannel extends NioChannel {
}
}
@Override
public InetSocketAddress getLocalAddress() {
attemptToSetLocalAddress();
return localAddress;
}
@Override
public ServerSocketChannel getRawChannel() {
return socketChannel;
return serverSocketChannel;
}
@Override
@ -64,4 +71,10 @@ public class NioServerSocketChannel extends NioChannel {
"localAddress=" + getLocalAddress() +
'}';
}
private void attemptToSetLocalAddress() {
if (localAddress == null) {
localAddress = (InetSocketAddress) serverSocketChannel.socket().getLocalSocketAddress();
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.nio;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
@ -27,15 +28,19 @@ import java.util.function.BiConsumer;
public class NioSocketChannel extends NioChannel {
private final InetSocketAddress remoteAddress;
private final AtomicBoolean contextSet = new AtomicBoolean(false);
private final SocketChannel socketChannel;
private final InetSocketAddress remoteAddress;
private volatile InetSocketAddress localAddress;
private SocketChannelContext context;
public NioSocketChannel(SocketChannel socketChannel) throws IOException {
super(socketChannel);
public NioSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
try {
this.remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public void setContext(SocketChannelContext context) {
@ -46,6 +51,14 @@ public class NioSocketChannel extends NioChannel {
}
}
@Override
public InetSocketAddress getLocalAddress() {
if (localAddress == null) {
localAddress = (InetSocketAddress) socketChannel.socket().getLocalSocketAddress();
}
return localAddress;
}
@Override
public SocketChannel getRawChannel() {
return socketChannel;

View File

@ -23,6 +23,7 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
@ -69,7 +70,9 @@ public class EventHandlerTests extends ESTestCase {
channel.setContext(context);
handler.handleRegistration(context);
NioServerSocketChannel serverChannel = new NioServerSocketChannel(mock(ServerSocketChannel.class));
ServerSocketChannel serverSocketChannel = mock(ServerSocketChannel.class);
when(serverSocketChannel.socket()).thenReturn(mock(ServerSocket.class));
NioServerSocketChannel serverChannel = new NioServerSocketChannel(serverSocketChannel);
serverContext = new DoNotRegisterServerContext(serverChannel, mock(NioSelector.class), mock(Consumer.class));
serverChannel.setContext(serverContext);

View File

@ -23,9 +23,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -42,16 +42,16 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
private final ScriptService scriptService;
private final NamedXContentRegistry xContentRegistry;
private final TransportMultiSearchAction multiSearchAction;
private final NodeClient client;
@Inject
public TransportMultiSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, TransportMultiSearchAction multiSearchAction) {
NamedXContentRegistry xContentRegistry, NodeClient client) {
super(settings, MultiSearchTemplateAction.NAME, threadPool, transportService, actionFilters, MultiSearchTemplateRequest::new);
this.scriptService = scriptService;
this.xContentRegistry = xContentRegistry;
this.multiSearchAction = multiSearchAction;
this.client = client;
}
@Override
@ -81,7 +81,7 @@ public class TransportMultiSearchTemplateAction extends HandledTransportAction<M
}
}
multiSearchAction.execute(multiSearchRequest, ActionListener.wrap(r -> {
client.multiSearch(multiSearchRequest, ActionListener.wrap(r -> {
for (int i = 0; i < r.getResponses().length; i++) {
MultiSearchResponse.Item item = r.getResponses()[i];
int originalSlot = originalSlots.get(i);

View File

@ -22,9 +22,9 @@ package org.elasticsearch.script.mustache;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -50,20 +50,18 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
private static final String TEMPLATE_LANG = MustacheScriptEngine.NAME;
private final ScriptService scriptService;
private final TransportSearchAction searchAction;
private final NamedXContentRegistry xContentRegistry;
private final NodeClient client;
@Inject
public TransportSearchTemplateAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters,
ScriptService scriptService,
TransportSearchAction searchAction,
NamedXContentRegistry xContentRegistry) {
ActionFilters actionFilters, ScriptService scriptService, NamedXContentRegistry xContentRegistry,
NodeClient client) {
super(settings, SearchTemplateAction.NAME, threadPool, transportService, actionFilters,
(Supplier<SearchTemplateRequest>) SearchTemplateRequest::new);
this.scriptService = scriptService;
this.searchAction = searchAction;
this.xContentRegistry = xContentRegistry;
this.client = client;
}
@Override
@ -72,7 +70,7 @@ public class TransportSearchTemplateAction extends HandledTransportAction<Search
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
if (searchRequest != null) {
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {

View File

@ -25,7 +25,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
@ -743,9 +742,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
@Override
@SuppressWarnings("unchecked")
protected <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(
Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
if (false == expectedHeaders.equals(threadPool().getThreadContext().getHeaders())) {
listener.onFailure(
new RuntimeException("Expected " + expectedHeaders + " but got " + threadPool().getThreadContext().getHeaders()));

View File

@ -29,8 +29,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;
import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;
@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
@ -42,7 +40,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
try {
@ -77,12 +75,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(httpChannel, new Exception(cause));
serverTransport.onException(channel, new Exception(cause));
} else {
serverTransport.onException(httpChannel, (Exception) cause);
serverTransport.onException(channel, (Exception) cause);
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
public class Netty4HttpServerChannel implements HttpServerChannel {
private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public void close() {
channel.close();
}
@Override
public String toString() {
return "Netty4HttpChannel{localAddress=" + getLocalAddress() + "}";
}
}

View File

@ -23,6 +23,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -42,22 +43,19 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
@ -65,14 +63,9 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
@ -154,12 +147,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final int pipeliningMaxEvents;
private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
private final ByteSizeValue tcpSendBufferSize;
private final ByteSizeValue tcpReceiveBufferSize;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;
@ -167,8 +154,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected volatile ServerBootstrap serverBootstrap;
protected final List<Channel> serverChannels = new ArrayList<>();
private final Netty4CorsConfig corsConfig;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
@ -184,11 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
@ -217,6 +197,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
@ -238,10 +219,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
this.boundAddress = createBoundHttpAddress();
if (logger.isInfoEnabled()) {
logger.info("{}", boundAddress);
}
bindServer();
success = true;
} finally {
if (success == false) {
@ -284,78 +262,29 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (serverChannels) {
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
serverChannels.add(future.channel());
boundSocket.set((InetSocketAddress) future.channel().localAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
ChannelFuture future = serverBootstrap.bind(socketAddress).sync();
Channel channel = future.channel();
Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel);
channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel);
return httpServerChannel;
}
@Override
protected void doStop() {
synchronized (serverChannels) {
if (!serverChannels.isEmpty()) {
try {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
}
}
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
protected void stopInternal() {
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
serverBootstrap = null;
}
}
@Override
protected void doClose() {
}
@Override
public HttpStats stats() {
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
@Override
protected void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Http read timeout {}", channel);
}
CloseableChannel.closeChannel(channel);;
CloseableChannel.closeChannel(channel);
} else {
super.onException(channel, cause);
}
@ -366,6 +295,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel");
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
@ -413,4 +343,24 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
}
@ChannelHandler.Sharable
private static class ServerChannelExceptionHandler extends ChannelHandlerAdapter {
private final Netty4HttpServerTransport transport;
private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) {
this.transport = transport;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
transport.onServerException(httpServerChannel, new Exception(cause));
} else {
transport.onServerException(httpServerChannel, (Exception) cause);
}
}
}
}

View File

@ -24,6 +24,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.Transports;
@ -36,11 +38,9 @@ import org.elasticsearch.transport.Transports;
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final Netty4Transport transport;
private final String profileName;
Netty4MessageChannelHandler(Netty4Transport transport, String profileName) {
Netty4MessageChannelHandler(Netty4Transport transport) {
this.transport = transport;
this.profileName = profileName;
}
@Override
@ -58,7 +58,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size
BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize);
Attribute<NettyTcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
transport.messageReceived(reference, channelAttribute.get());
} finally {
// Set the expected position of the buffer, no matter what happened
@ -69,7 +69,13 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
transport.exceptionCaught(ctx, cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
if (newCause instanceof Error) {
transport.onException(tcpChannel, new Exception(newCause));
} else {
transport.onException(tcpChannel, (Exception) newCause);
}
}
}

View File

@ -30,13 +30,13 @@ import org.elasticsearch.transport.TransportException;
import java.net.InetSocketAddress;
public class NettyTcpChannel implements TcpChannel {
public class Netty4TcpChannel implements TcpChannel {
private final Channel channel;
private final String profile;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
NettyTcpChannel(Channel channel, String profile) {
Netty4TcpChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
@ -118,7 +118,7 @@ public class NettyTcpChannel implements TcpChannel {
@Override
public String toString() {
return "NettyTcpChannel{" +
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
'}';

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpServerChannel;
import java.net.InetSocketAddress;
public class Netty4TcpServerChannel implements TcpServerChannel {
private final Channel channel;
private final String profile;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4TcpServerChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
public String getProfile() {
return profile;
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
@Override
public void close() {
channel.close();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public String toString() {
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
'}';
}
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -37,8 +38,6 @@ import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
@ -196,6 +195,7 @@ public class Netty4Transport extends TcpTransport {
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.handler(new ServerChannelExceptionHandler());
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
@ -226,17 +226,11 @@ public class Netty4Transport extends TcpTransport {
return new ClientChannelInitializer();
}
static final AttributeKey<NettyTcpChannel> CHANNEL_KEY = AttributeKey.newInstance("es-channel");
protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable t = unwrapped != null ? unwrapped : cause;
Channel channel = ctx.channel();
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
}
static final AttributeKey<Netty4TcpChannel> CHANNEL_KEY = AttributeKey.newInstance("es-channel");
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");
@Override
protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
ChannelFuture channelFuture = bootstrap.connect(address);
Channel channel = channelFuture.channel();
if (channel == null) {
@ -245,7 +239,7 @@ public class Netty4Transport extends TcpTransport {
}
addClosedExceptionLogger(channel);
NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default");
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default");
channel.attr(CHANNEL_KEY).set(nettyChannel);
channelFuture.addListener(f -> {
@ -266,10 +260,10 @@ public class Netty4Transport extends TcpTransport {
}
@Override
protected NettyTcpChannel bind(String name, InetSocketAddress address) {
protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
NettyTcpChannel esChannel = new NettyTcpChannel(channel, name);
channel.attr(CHANNEL_KEY).set(esChannel);
Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel, name);
channel.attr(SERVER_CHANNEL_KEY).set(esChannel);
return esChannel;
}
@ -310,7 +304,7 @@ public class Netty4Transport extends TcpTransport {
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
}
@Override
@ -331,11 +325,11 @@ public class Netty4Transport extends TcpTransport {
@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name);
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name);
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
}
@ -353,4 +347,19 @@ public class Netty4Transport extends TcpTransport {
}
});
}
@ChannelHandler.Sharable
private class ServerChannelExceptionHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
onServerException(serverChannel, new Exception(cause));
} else {
onServerException(serverChannel, (Exception) cause);
}
}
}
}

View File

@ -70,7 +70,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
nettyTransport.start();
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses);
TransportAddress transportAddress = randomFrom(boundAddresses);
port = transportAddress.address().getPort();
host = transportAddress.address().getAddress();
}

View File

@ -24,12 +24,10 @@ import com.microsoft.windowsazure.management.compute.models.DeploymentStatus;
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.classic.AzureServiceDisableException;
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.InetAddresses;
@ -47,9 +45,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class AzureUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
public enum HostType {
@ -104,7 +99,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
private final TimeValue refreshInterval;
private long lastRefresh;
private List<DiscoveryNode> cachedDiscoNodes;
private List<TransportAddress> dynamicHosts;
private final HostType hostType;
private final String publicEndpointName;
private final String deploymentName;
@ -137,30 +132,30 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
*/
@Override
public List<DiscoveryNode> buildDynamicNodes() {
public List<TransportAddress> buildDynamicHosts() {
if (refreshInterval.millis() != 0) {
if (cachedDiscoNodes != null &&
if (dynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
logger.trace("using cache to retrieve node list");
return cachedDiscoNodes;
return dynamicHosts;
}
lastRefresh = System.currentTimeMillis();
}
logger.debug("start building nodes list using Azure API");
cachedDiscoNodes = new ArrayList<>();
dynamicHosts = new ArrayList<>();
HostedServiceGetDetailedResponse detailed;
try {
detailed = azureComputeService.getServiceDetails();
} catch (AzureServiceDisableException e) {
logger.debug("Azure discovery service has been disabled. Returning empty list of nodes.");
return cachedDiscoNodes;
return dynamicHosts;
} catch (AzureServiceRemoteException e) {
// We got a remote exception
logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage());
logger.trace("AzureServiceRemoteException caught", e);
return cachedDiscoNodes;
return dynamicHosts;
}
InetAddress ipAddress = null;
@ -212,8 +207,7 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
for (TransportAddress address : addresses) {
logger.trace("adding {}, transport_address {}", networkAddress, address);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(),
emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
dynamicHosts.add(address);
}
} catch (Exception e) {
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
@ -221,9 +215,9 @@ public class AzureUnicastHostsProvider extends AbstractComponent implements Unic
}
}
logger.debug("{} node(s) added", cachedDiscoNodes.size());
logger.debug("{} addresses added", dynamicHosts.size());
return cachedDiscoNodes;
return dynamicHosts;
}
protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {

View File

@ -29,8 +29,6 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.Tag;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
@ -46,8 +44,6 @@ import java.util.Map;
import java.util.Set;
import static java.util.Collections.disjoint;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.TAG_PREFIX;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_DNS;
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_IP;
@ -70,7 +66,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
private final String hostType;
private final DiscoNodesCache discoNodes;
private final TransportAddressesCache dynamicHosts;
AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
super(settings);
@ -78,7 +74,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
this.awsEc2Service = awsEc2Service;
this.hostType = AwsEc2Service.HOST_TYPE_SETTING.get(settings);
this.discoNodes = new DiscoNodesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
this.dynamicHosts = new TransportAddressesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
this.bindAnyGroup = AwsEc2Service.ANY_GROUP_SETTING.get(settings);
this.groups = new HashSet<>();
@ -96,13 +92,13 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}
@Override
public List<DiscoveryNode> buildDynamicNodes() {
return discoNodes.getOrRefresh();
public List<TransportAddress> buildDynamicHosts() {
return dynamicHosts.getOrRefresh();
}
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
final List<DiscoveryNode> discoNodes = new ArrayList<>();
final List<TransportAddress> dynamicHosts = new ArrayList<>();
final DescribeInstancesResult descInstances;
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
@ -115,7 +111,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
} catch (final AmazonClientException e) {
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
logger.debug("Full exception:", e);
return discoNodes;
return dynamicHosts;
}
logger.trace("building dynamic unicast discovery nodes...");
@ -179,8 +175,7 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
for (int i = 0; i < addresses.length; i++) {
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
discoNodes.add(new DiscoveryNode(instance.getInstanceId(), "#cloud-" + instance.getInstanceId() + "-" + i,
addresses[i], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
dynamicHosts.add(addresses[i]);
}
} catch (final Exception e) {
final String finalAddress = address;
@ -194,9 +189,9 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}
}
logger.debug("using dynamic discovery nodes {}", discoNodes);
logger.debug("using dynamic transport addresses {}", dynamicHosts);
return discoNodes;
return dynamicHosts;
}
private DescribeInstancesRequest buildDescribeInstancesRequest() {
@ -222,11 +217,11 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
return describeInstancesRequest;
}
private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
private final class TransportAddressesCache extends SingleObjectCache<List<TransportAddress>> {
private boolean empty = true;
protected DiscoNodesCache(TimeValue refreshInterval) {
protected TransportAddressesCache(TimeValue refreshInterval) {
super(refreshInterval, new ArrayList<>());
}
@ -236,8 +231,8 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
}
@Override
protected List<DiscoveryNode> refresh() {
final List<DiscoveryNode> nodes = fetchDynamicNodes();
protected List<TransportAddress> refresh() {
final List<TransportAddress> nodes = fetchDynamicNodes();
empty = nodes.isEmpty();
return nodes;
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.discovery.ec2;
import com.amazonaws.services.ec2.model.Tag;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -87,16 +86,16 @@ public class Ec2DiscoveryTests extends ESTestCase {
null);
}
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
return buildDynamicNodes(nodeSettings, nodes, null);
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
return buildDynamicHosts(nodeSettings, nodes, null);
}
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
logger.debug("--> nodes found: {}", discoveryNodes);
return discoveryNodes;
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
logger.debug("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
} catch (IOException e) {
fail("Unexpected IOException");
return null;
@ -107,7 +106,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
int nodes = randomInt(10);
Settings nodeSettings = Settings.builder()
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
List<TransportAddress> discoveryNodes = buildDynamicHosts(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
}
@ -119,12 +118,11 @@ public class Ec2DiscoveryTests extends ESTestCase {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> transportAddresses = buildDynamicHosts(nodeSettings, nodes);
assertThat(transportAddresses, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
for (TransportAddress address : transportAddresses) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
assertEquals(address, expected);
}
@ -138,12 +136,11 @@ public class Ec2DiscoveryTests extends ESTestCase {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
assertEquals(address, expected);
}
@ -159,13 +156,12 @@ public class Ec2DiscoveryTests extends ESTestCase {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
assertEquals(address, expected);
@ -182,13 +178,12 @@ public class Ec2DiscoveryTests extends ESTestCase {
Settings nodeSettings = Settings.builder()
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns")
.build();
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
assertThat(discoveryNodes, hasSize(nodes));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
assertThat(dynamicHosts, hasSize(nodes));
// We check that we are using here expected address
int node = 1;
for (DiscoveryNode discoveryNode : discoveryNodes) {
for (TransportAddress address : dynamicHosts) {
String instanceId = "node" + node++;
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
assertEquals(address, expected);
@ -201,7 +196,7 @@ public class Ec2DiscoveryTests extends ESTestCase {
.build();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
buildDynamicNodes(nodeSettings, 1);
buildDynamicHosts(nodeSettings, 1);
});
assertThat(exception.getMessage(), containsString("does_not_exist is unknown for discovery.ec2.host_type"));
}
@ -227,8 +222,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(prodInstances));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(prodInstances));
}
public void testFilterByMultipleTags() throws InterruptedException {
@ -258,8 +253,8 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(prodInstances));
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(prodInstances));
}
public void testReadHostFromTag() throws InterruptedException, UnknownHostException {
@ -285,11 +280,11 @@ public class Ec2DiscoveryTests extends ESTestCase {
}
logger.info("started [{}] instances", nodes);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
assertThat(discoveryNodes, hasSize(nodes));
for (DiscoveryNode discoveryNode : discoveryNodes) {
TransportAddress address = discoveryNode.getAddress();
TransportAddress expected = poorMansDNS.get(discoveryNode.getName());
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
assertThat(dynamicHosts, hasSize(nodes));
int node = 1;
for (TransportAddress address : dynamicHosts) {
TransportAddress expected = poorMansDNS.get("node" + node++);
assertEquals(address, expected);
}
}
@ -306,13 +301,13 @@ public class Ec2DiscoveryTests extends ESTestCase {
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return new ArrayList<>();
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(3));
}
@ -323,18 +318,18 @@ public class Ec2DiscoveryTests extends ESTestCase {
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, plugin.ec2Service) {
@Override
protected List<DiscoveryNode> fetchDynamicNodes() {
protected List<TransportAddress> fetchDynamicNodes() {
fetchCount++;
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1);
}
};
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(1));
Thread.sleep(1_000L); // wait for cache to expire
for (int i=0; i<3; i++) {
provider.buildDynamicNodes();
provider.buildDynamicHosts();
}
assertThat(provider.fetchCount, is(2));
}

View File

@ -21,8 +21,8 @@ package org.elasticsearch.discovery.file;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.env.Environment;
@ -58,7 +58,6 @@ import static org.elasticsearch.discovery.zen.UnicastZenPing.resolveHostsLists;
class FileBasedUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt";
static final String UNICAST_HOST_PREFIX = "#zen_file_unicast_host_";
private final TransportService transportService;
private final ExecutorService executorService;
@ -76,7 +75,7 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
}
@Override
public List<DiscoveryNode> buildDynamicNodes() {
public List<TransportAddress> buildDynamicHosts() {
List<String> hostsList;
try (Stream<String> lines = Files.lines(unicastHostsFilePath)) {
hostsList = lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments
@ -91,23 +90,22 @@ class FileBasedUnicastHostsProvider extends AbstractComponent implements Unicast
hostsList = Collections.emptyList();
}
final List<DiscoveryNode> discoNodes = new ArrayList<>();
final List<TransportAddress> dynamicHosts = new ArrayList<>();
try {
discoNodes.addAll(resolveHostsLists(
dynamicHosts.addAll(resolveHostsLists(
executorService,
logger,
hostsList,
1,
transportService,
UNICAST_HOST_PREFIX,
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.debug("[discovery-file] Using dynamic discovery nodes {}", discoNodes);
logger.debug("[discovery-file] Using dynamic discovery nodes {}", dynamicHosts);
return discoNodes;
return dynamicHosts;
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery.file;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
@ -50,7 +49,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOSTS_FILE;
import static org.elasticsearch.discovery.file.FileBasedUnicastHostsProvider.UNICAST_HOST_PREFIX;
/**
* Tests for {@link FileBasedUnicastHostsProvider}.
@ -104,23 +102,20 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
public void testBuildDynamicNodes() throws Exception {
final List<String> hostEntries = Arrays.asList("#comment, should be ignored", "192.168.0.1", "192.168.0.2:9305", "255.255.23.15");
final List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
final List<TransportAddress> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(hostEntries.size() - 1, nodes.size()); // minus 1 because we are ignoring the first line that's a comment
assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress());
assertEquals(9300, nodes.get(0).getAddress().getPort());
assertEquals(UNICAST_HOST_PREFIX + "192.168.0.1_0#", nodes.get(0).getId());
assertEquals("192.168.0.2", nodes.get(1).getAddress().getAddress());
assertEquals(9305, nodes.get(1).getAddress().getPort());
assertEquals(UNICAST_HOST_PREFIX + "192.168.0.2:9305_0#", nodes.get(1).getId());
assertEquals("255.255.23.15", nodes.get(2).getAddress().getAddress());
assertEquals(9300, nodes.get(2).getAddress().getPort());
assertEquals(UNICAST_HOST_PREFIX + "255.255.23.15_0#", nodes.get(2).getId());
assertEquals("192.168.0.1", nodes.get(0).getAddress());
assertEquals(9300, nodes.get(0).getPort());
assertEquals("192.168.0.2", nodes.get(1).getAddress());
assertEquals(9305, nodes.get(1).getPort());
assertEquals("255.255.23.15", nodes.get(2).getAddress());
assertEquals(9300, nodes.get(2).getPort());
}
public void testEmptyUnicastHostsFile() throws Exception {
final List<String> hostEntries = Collections.emptyList();
final List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(0, nodes.size());
final List<TransportAddress> addresses = setupAndRunHostProvider(hostEntries);
assertEquals(0, addresses.size());
}
public void testUnicastHostsDoesNotExist() throws Exception {
@ -129,27 +124,27 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
.build();
final Environment environment = TestEnvironment.newEnvironment(settings);
final FileBasedUnicastHostsProvider provider = new FileBasedUnicastHostsProvider(environment, transportService, executorService);
final List<DiscoveryNode> nodes = provider.buildDynamicNodes();
assertEquals(0, nodes.size());
final List<TransportAddress> addresses = provider.buildDynamicHosts();
assertEquals(0, addresses.size());
}
public void testInvalidHostEntries() throws Exception {
List<String> hostEntries = Arrays.asList("192.168.0.1:9300:9300");
List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(0, nodes.size());
List<TransportAddress> addresses = setupAndRunHostProvider(hostEntries);
assertEquals(0, addresses.size());
}
public void testSomeInvalidHostEntries() throws Exception {
List<String> hostEntries = Arrays.asList("192.168.0.1:9300:9300", "192.168.0.1:9301");
List<DiscoveryNode> nodes = setupAndRunHostProvider(hostEntries);
assertEquals(1, nodes.size()); // only one of the two is valid and will be used
assertEquals("192.168.0.1", nodes.get(0).getAddress().getAddress());
assertEquals(9301, nodes.get(0).getAddress().getPort());
List<TransportAddress> addresses = setupAndRunHostProvider(hostEntries);
assertEquals(1, addresses.size()); // only one of the two is valid and will be used
assertEquals("192.168.0.1", addresses.get(0).getAddress());
assertEquals(9301, addresses.get(0).getPort());
}
// sets up the config dir, writes to the unicast hosts file in the config dir,
// and then runs the file-based unicast host provider to get the list of discovery nodes
private List<DiscoveryNode> setupAndRunHostProvider(final List<String> hostEntries) throws IOException {
private List<TransportAddress> setupAndRunHostProvider(final List<String> hostEntries) throws IOException {
final Path homeDir = createTempDir();
final Settings settings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), homeDir)
@ -168,6 +163,6 @@ public class FileBasedUnicastHostsProviderTests extends ESTestCase {
}
return new FileBasedUnicastHostsProvider(
new Environment(settings, configPath), transportService, executorService).buildDynamicNodes();
new Environment(settings, configPath), transportService, executorService).buildDynamicHosts();
}
}

View File

@ -31,9 +31,7 @@ import com.google.api.services.compute.model.Instance;
import com.google.api.services.compute.model.NetworkInterface;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.gce.GceInstancesService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.network.NetworkAddress;
@ -47,8 +45,6 @@ import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.transport.TransportService;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
public class GceUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
@ -72,7 +68,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
private final TimeValue refreshInterval;
private long lastRefresh;
private List<DiscoveryNode> cachedDiscoNodes;
private List<TransportAddress> cachedDynamicHosts;
public GceUnicastHostsProvider(Settings settings, GceInstancesService gceInstancesService,
TransportService transportService,
@ -97,7 +93,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
* Information can be cached using `cloud.gce.refresh_interval` property if needed.
*/
@Override
public List<DiscoveryNode> buildDynamicNodes() {
public List<TransportAddress> buildDynamicHosts() {
// We check that needed properties have been set
if (this.project == null || this.project.isEmpty() || this.zones == null || this.zones.isEmpty()) {
throw new IllegalArgumentException("one or more gce discovery settings are missing. " +
@ -106,16 +102,16 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
}
if (refreshInterval.millis() != 0) {
if (cachedDiscoNodes != null &&
if (cachedDynamicHosts != null &&
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
if (logger.isTraceEnabled()) logger.trace("using cache to retrieve node list");
return cachedDiscoNodes;
return cachedDynamicHosts;
}
lastRefresh = System.currentTimeMillis();
}
logger.debug("start building nodes list using GCE API");
cachedDiscoNodes = new ArrayList<>();
cachedDynamicHosts = new ArrayList<>();
String ipAddress = null;
try {
InetAddress inetAddress = networkService.resolvePublishHostAddresses(
@ -133,7 +129,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
if (instances == null) {
logger.trace("no instance found for project [{}], zones [{}].", this.project, this.zones);
return cachedDiscoNodes;
return cachedDynamicHosts;
}
for (Instance instance : instances) {
@ -238,8 +234,7 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
for (TransportAddress transportAddress : addresses) {
logger.trace("adding {}, type {}, address {}, transport_address {}, status {}", name, type,
ip_private, transportAddress, status);
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + name + "-" + 0, transportAddress,
emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
cachedDynamicHosts.add(transportAddress);
}
}
} catch (Exception e) {
@ -252,9 +247,9 @@ public class GceUnicastHostsProvider extends AbstractComponent implements Unicas
logger.warn("exception caught during discovery", e);
}
logger.debug("{} node(s) added", cachedDiscoNodes.size());
logger.debug("using dynamic discovery nodes {}", cachedDiscoNodes);
logger.debug("{} addresses added", cachedDynamicHosts.size());
logger.debug("using transport addresses {}", cachedDynamicHosts);
return cachedDiscoNodes;
return cachedDynamicHosts;
}
}

View File

@ -21,9 +21,9 @@ package org.elasticsearch.discovery.gce;
import org.elasticsearch.Version;
import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -40,7 +40,6 @@ import java.util.Locale;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
/**
* This test class uses a GCE HTTP Mock system which allows to simulate JSON Responses.
@ -105,13 +104,13 @@ public class GceDiscoveryTests extends ESTestCase {
}
}
protected List<DiscoveryNode> buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) {
protected List<TransportAddress> buildDynamicNodes(GceInstancesServiceImpl gceInstancesService, Settings nodeSettings) {
GceUnicastHostsProvider provider = new GceUnicastHostsProvider(nodeSettings, gceInstancesService,
transportService, new NetworkService(Collections.emptyList()));
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
logger.info("--> nodes found: {}", discoveryNodes);
return discoveryNodes;
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
logger.info("--> addresses found: {}", dynamicHosts);
return dynamicHosts;
}
public void testNodesWithDifferentTagsAndNoTagSet() {
@ -120,8 +119,8 @@ public class GceDiscoveryTests extends ESTestCase {
.put(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(2));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(2));
}
public void testNodesWithDifferentTagsAndOneTagSet() {
@ -131,9 +130,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(1));
assertThat(discoveryNodes.get(0).getId(), is("#cloud-test2-0"));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(1));
}
public void testNodesWithDifferentTagsAndTwoTagSet() {
@ -143,9 +141,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch", "dev")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(1));
assertThat(discoveryNodes.get(0).getId(), is("#cloud-test2-0"));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(1));
}
public void testNodesWithSameTagsAndNoTagSet() {
@ -154,8 +151,8 @@ public class GceDiscoveryTests extends ESTestCase {
.put(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(2));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(2));
}
public void testNodesWithSameTagsAndOneTagSet() {
@ -165,8 +162,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(2));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(2));
}
public void testNodesWithSameTagsAndTwoTagsSet() {
@ -176,8 +173,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceUnicastHostsProvider.TAGS_SETTING.getKey(), "elasticsearch", "dev")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(2));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(2));
}
public void testMultipleZonesAndTwoNodesInSameZone() {
@ -186,8 +183,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "europe-west1-b")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(2));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(2));
}
public void testMultipleZonesAndTwoNodesInDifferentZones() {
@ -196,8 +193,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "europe-west1-b")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(2));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(2));
}
/**
@ -209,8 +206,8 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "us-central1-a", "us-central1-b")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(0));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(0));
}
public void testIllegalSettingsMissingAllRequired() {
@ -261,7 +258,7 @@ public class GceDiscoveryTests extends ESTestCase {
.putList(GceInstancesServiceImpl.ZONE_SETTING.getKey(), "europe-west1-b", "us-central1-a")
.build();
mock = new GceInstancesServiceMock(nodeSettings);
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(mock, nodeSettings);
assertThat(discoveryNodes, hasSize(1));
List<TransportAddress> dynamicHosts = buildDynamicNodes(mock, nodeSettings);
assertThat(dynamicHosts, hasSize(1));
}
}

View File

@ -24,12 +24,11 @@ import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.nio.NioSocketChannel;
import java.io.IOException;
import java.nio.channels.SocketChannel;
public class NioHttpChannel extends NioSocketChannel implements HttpChannel {
NioHttpChannel(SocketChannel socketChannel) throws IOException {
NioHttpChannel(SocketChannel socketChannel) {
super(socketChannel);
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.nio.NioServerSocketChannel;
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
public class NioHttpServerChannel extends NioServerSocketChannel implements HttpServerChannel {
NioHttpServerChannel(ServerSocketChannel serverSocketChannel) throws IOException {
super(serverSocketChannel);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override
public String toString() {
return "NioHttpServerChannel{localAddress=" + getLocalAddress() + "}";
}
}

View File

@ -21,40 +21,29 @@ package org.elasticsearch.http.nio;
import io.netty.handler.codec.http.HttpMethod;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioChannel;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
@ -62,18 +51,11 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Pattern;
@ -113,7 +95,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private final int tcpSendBufferSize;
private final int tcpReceiveBufferSize;
private final Set<NioServerSocketChannel> serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private NioGroup nioGroup;
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;
@ -156,12 +137,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount,
(s) -> new EventHandler(this::onNonChannelException, s));
channelFactory = new HttpChannelFactory();
this.boundAddress = createBoundHttpAddress();
if (logger.isInfoEnabled()) {
logger.info("{}", boundAddress);
}
bindServer();
success = true;
} catch (IOException e) {
throw new ElasticsearchException(e);
@ -173,26 +149,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected void doStop() {
synchronized (serverChannels) {
if (serverChannels.isEmpty() == false) {
try {
closeChannels(new ArrayList<>(serverChannels));
} catch (Exception e) {
logger.error("unexpected exception while closing http server channels", e);
}
serverChannels.clear();
}
}
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
protected void stopInternal() {
try {
nioGroup.close();
} catch (Exception e) {
@ -201,40 +158,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected void doClose() throws IOException {
}
@Override
protected TransportAddress bindAddress(InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (serverChannels) {
InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber);
NioServerSocketChannel channel = nioGroup.bindServerChannel(address, channelFactory);
serverChannels.add(channel);
boundSocket.set(channel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (success == false) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
}
@Override
public HttpStats stats() {
return new HttpStats(serverChannels.size(), totalChannelsAccepted.get());
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOException {
return nioGroup.bindServerChannel(socketAddress, channelFactory);
}
static NioCorsConfig buildCorsConfig(Settings settings) {
@ -269,33 +194,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
.build();
}
private void closeChannels(List<NioChannel> channels) {
List<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (NioChannel channel : channels) {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
channel.addCloseListener(ActionListener.toBiConsumer(future));
futures.add(future);
channel.close();
}
List<RuntimeException> closeExceptions = new ArrayList<>();
for (ActionFuture<Void> f : futures) {
try {
f.actionGet();
} catch (RuntimeException e) {
closeExceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(closeExceptions);
}
private void acceptChannel(NioSocketChannel socketChannel) {
super.serverAcceptedChannel((HttpChannel) socketChannel);
}
private class HttpChannelFactory extends ChannelFactory<NioServerSocketChannel, NioHttpChannel> {
private class HttpChannelFactory extends ChannelFactory<NioHttpServerChannel, NioHttpChannel> {
private HttpChannelFactory() {
super(new RawChannelFactory(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize));
@ -303,29 +206,28 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
@Override
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioHttpChannel nioChannel = new NioHttpChannel(channel);
NioHttpChannel httpChannel = new NioHttpChannel(channel);
java.util.function.Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
Consumer<Exception> exceptionHandler = (e) -> onException(httpChannel, e);
SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline,
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
httpChannel.setContext(context);
return httpChannel;
}
@Override
public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
public NioHttpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioHttpServerChannel httpServerChannel = new NioHttpServerChannel(channel);
Consumer<Exception> exceptionHandler = (e) -> onServerException(httpServerChannel, e);
Consumer<NioSocketChannel> acceptor = NioHttpServerTransport.this::acceptChannel;
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);
return nioChannel;
ServerChannelContext context = new ServerChannelContext(httpServerChannel, this, selector, acceptor, exceptionHandler);
httpServerChannel.setContext(context);
return httpServerChannel;
}
}

View File

@ -32,7 +32,7 @@ public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
private final String profile;
public NioTcpChannel(String profile, SocketChannel socketChannel) throws IOException {
public NioTcpChannel(String profile, SocketChannel socketChannel) {
super(socketChannel);
this.profile = profile;
}

View File

@ -20,43 +20,24 @@
package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
/**
* This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel}
* This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpServerChannel}
* interface. As it is a server socket, setting SO_LINGER and sending messages is not supported.
*/
public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel {
public class NioTcpServerChannel extends NioServerSocketChannel implements TcpServerChannel {
private final String profile;
public NioTcpServerChannel(String profile, ServerSocketChannel socketChannel) throws IOException {
public NioTcpServerChannel(String profile, ServerSocketChannel socketChannel) {
super(socketChannel);
this.profile = profile;
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
throw new UnsupportedOperationException("Cannot send a message to a server channel.");
}
@Override
public void setSoLinger(int value) throws IOException {
throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel.");
}
@Override
public InetSocketAddress getRemoteAddress() {
return null;
}
@Override
public void close() {
getContext().closeChannel();
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -176,8 +175,7 @@ public class NioTransport extends TcpTransport {
@Override
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
Consumer<Exception> exceptionHandler = (e) -> onServerException(nioChannel, e);
Consumer<NioSocketChannel> acceptor = NioTransport.this::acceptChannel;
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);

View File

@ -55,8 +55,7 @@ public class CreateIndexResponse extends ShardsAcknowledgedResponse {
private String index;
protected CreateIndexResponse() {
}
public CreateIndexResponse() {}
protected CreateIndexResponse(boolean acknowledged, boolean shardsAcknowledged, String index) {
super(acknowledged, shardsAcknowledged);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.PrimaryMissingActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -58,16 +59,15 @@ import java.util.Set;
public class TransportUpgradeAction extends TransportBroadcastByNodeAction<UpgradeRequest, UpgradeResponse, ShardUpgradeResult> {
private final IndicesService indicesService;
private final TransportUpgradeSettingsAction upgradeSettingsAction;
private final NodeClient client;
@Inject
public TransportUpgradeAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, TransportUpgradeSettingsAction upgradeSettingsAction) {
IndexNameExpressionResolver indexNameExpressionResolver, NodeClient client) {
super(settings, UpgradeAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpgradeRequest::new, ThreadPool.Names.FORCE_MERGE);
this.indicesService = indicesService;
this.upgradeSettingsAction = upgradeSettingsAction;
this.client = client;
}
@Override
@ -205,7 +205,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
upgradeSettingsAction.execute(upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<UpgradeSettingsResponse>() {
@Override
public void onResponse(UpgradeSettingsResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);

View File

@ -30,7 +30,6 @@ import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
@ -38,6 +37,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -88,27 +88,24 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final ClusterService clusterService;
private final IngestService ingestService;
private final TransportShardBulkAction shardBulkAction;
private final TransportCreateIndexAction createIndexAction;
private final LongSupplier relativeTimeProvider;
private final IngestActionForwarder ingestForwarder;
private final NodeClient client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex) {
this(settings, threadPool, transportService, clusterService, ingestService,
shardBulkAction, createIndexAction,
actionFilters, indexNameExpressionResolver,
autoCreateIndex,
System::nanoTime);
this(settings, threadPool, transportService, clusterService, ingestService, shardBulkAction, client, actionFilters,
indexNameExpressionResolver, autoCreateIndex, System::nanoTime);
}
public TransportBulkAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, IngestService ingestService,
TransportShardBulkAction shardBulkAction, TransportCreateIndexAction createIndexAction,
TransportShardBulkAction shardBulkAction, NodeClient client,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex, LongSupplier relativeTimeProvider) {
super(settings, BulkAction.NAME, threadPool, transportService, actionFilters, BulkRequest::new);
@ -116,10 +113,10 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
this.clusterService = clusterService;
this.ingestService = ingestService;
this.shardBulkAction = shardBulkAction;
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
this.relativeTimeProvider = relativeTimeProvider;
this.ingestForwarder = new IngestActionForwarder(transportService);
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
clusterService.addStateApplier(this.ingestForwarder);
}
@ -224,7 +221,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
createIndexRequest.index(index);
createIndexRequest.cause("auto(bulk api)");
createIndexRequest.masterNodeTimeout(timeout);
createIndexAction.execute(createIndexRequest, listener);
client.admin().indices().create(createIndexRequest, listener);
}
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest request, String index, Exception e) {

View File

@ -23,9 +23,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -47,16 +47,16 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
private final PipelineStore pipelineStore;
private final ClusterService clusterService;
private final TransportNodesInfoAction nodesInfoAction;
private final NodeClient client;
@Inject
public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService,
TransportNodesInfoAction nodesInfoAction) {
NodeClient client) {
super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new);
this.clusterService = clusterService;
this.nodesInfoAction = nodesInfoAction;
this.client = client;
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
}
@ -75,7 +75,7 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction<PutPip
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.ingest(true);
nodesInfoAction.execute(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
client.admin().cluster().nodesInfo(nodesInfoRequest, new ActionListener<NodesInfoResponse>() {
@Override
public void onResponse(NodesInfoResponse nodeInfos) {
try {

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.service.ClusterService;
@ -43,27 +43,27 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
private final int availableProcessors;
private final ClusterService clusterService;
private final TransportAction<SearchRequest, SearchResponse> searchAction;
private final LongSupplier relativeTimeProvider;
private final NodeClient client;
@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction, ActionFilters actionFilters) {
ClusterService clusterService, ActionFilters actionFilters, NodeClient client) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.numberOfProcessors(settings);
this.relativeTimeProvider = System::nanoTime;
this.client = client;
}
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
int availableProcessors, LongSupplier relativeTimeProvider) {
ClusterService clusterService, int availableProcessors,
LongSupplier relativeTimeProvider, NodeClient client) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
this.relativeTimeProvider = relativeTimeProvider;
this.client = client;
}
@Override
@ -141,7 +141,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
* when we handle the response rather than going recursive, we fork to another thread, otherwise we recurse.
*/
final Thread thread = Thread.currentThread();
searchAction.execute(request.request, new ActionListener<SearchResponse>() {
client.search(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(final SearchResponse searchResponse) {
handleResponse(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));

View File

@ -24,8 +24,6 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -34,6 +32,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
@ -66,22 +65,21 @@ import static org.elasticsearch.action.bulk.TransportSingleItemBulkWriteAction.w
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
private final TransportBulkAction bulkAction;
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
private final IndicesService indicesService;
private final NodeClient client;
@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
TransportBulkAction bulkAction, TransportCreateIndexAction createIndexAction, UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService, AutoCreateIndex autoCreateIndex) {
UpdateHelper updateHelper, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndicesService indicesService,
AutoCreateIndex autoCreateIndex, NodeClient client) {
super(settings, UpdateAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, UpdateRequest::new);
this.bulkAction = bulkAction;
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.indicesService = indicesService;
this.autoCreateIndex = autoCreateIndex;
this.client = client;
}
@Override
@ -116,7 +114,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
client.admin().indices().create(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
@ -177,7 +175,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
IndexRequest upsertRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
bulkAction.execute(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
client.bulk(toSingleItemBulkRequest(upsertRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
if (request.fetchSource() != null && request.fetchSource().fetchSource()) {
@ -197,7 +195,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
IndexRequest indexRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
bulkAction.execute(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
client.bulk(toSingleItemBulkRequest(indexRequest), wrapBulkResponse(
ActionListener.<IndexResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
@ -208,7 +206,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
break;
case DELETED:
DeleteRequest deleteRequest = result.action();
bulkAction.execute(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
client.bulk(toSingleItemBulkRequest(deleteRequest), wrapBulkResponse(
ActionListener.<DeleteResponse>wrap(response -> {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getSeqNo(), response.getPrimaryTerm(), response.getVersion(), response.getResult());
update.setGetResult(UpdateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));

View File

@ -21,13 +21,11 @@ package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
/**
* A {@link Client} that contains another {@link Client} which it
* uses as its basic source, possibly transforming the requests / responses along the
@ -62,8 +60,8 @@ public abstract class FilterClient extends AbstractClient {
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(
Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
in().execute(action, request, listener);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.tasks.Task;
@ -58,10 +57,8 @@ public class ParentTaskAssigningClient extends FilterClient {
}
@Override
protected < Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>
> void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
request.setParentTask(parentTask);
super.doExecute(action, request, listener);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.client.node;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
@ -67,10 +66,8 @@ public class NodeClient extends AbstractClient {
}
@Override
public < Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>
> void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
// Discard the task because the Client interface doesn't use it.
executeLocally(action, request, listener);
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainAction;
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
@ -401,7 +400,8 @@ public abstract class AbstractClient extends AbstractComponent implements Client
doExecute(action, request, listener);
}
protected abstract <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(Action<Response> action, Request request, ActionListener<Response> listener);
protected abstract <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener);
@Override
public ActionFuture<IndexResponse> index(final IndexRequest request) {
@ -1764,7 +1764,8 @@ public abstract class AbstractClient extends AbstractComponent implements Client
public Client filterWithHeader(Map<String, String> headers) {
return new FilterClient(this) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
ThreadContext threadContext = threadPool().getThreadContext();
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) {
super.doExecute(action, request, listener);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.ClusterModule;
@ -377,7 +376,8 @@ public abstract class TransportClient extends AbstractClient {
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
proxy.execute(action, request, listener);
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.discovery.zen;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import java.util.List;
@ -31,5 +31,5 @@ public interface UnicastHostsProvider {
/**
* Builds the dynamic list of unicast hosts to be used for unicast discovery.
*/
List<DiscoveryNode> buildDynamicNodes();
List<TransportAddress> buildDynamicHosts();
}

View File

@ -118,9 +118,6 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private final AtomicInteger pingingRoundIdGenerator = new AtomicInteger();
// used as a node id prefix for configured unicast host nodes/address
private static final String UNICAST_NODE_PREFIX = "#zen_unicast_";
private final Map<Integer, PingingRound> activePingingRounds = newConcurrentMap();
// a list of temporal responses a node will return for a request (holds responses from other nodes)
@ -184,23 +181,20 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
* @param hosts the hosts to resolve
* @param limitPortCounts the number of ports to resolve (should be 1 for non-local transport)
* @param transportService the transport service
* @param nodeId_prefix a prefix to use for node ids
* @param resolveTimeout the timeout before returning from hostname lookups
* @return a list of discovery nodes with resolved transport addresses
* @return a list of resolved transport addresses
*/
public static List<DiscoveryNode> resolveHostsLists(
public static List<TransportAddress> resolveHostsLists(
final ExecutorService executorService,
final Logger logger,
final List<String> hosts,
final int limitPortCounts,
final TransportService transportService,
final String nodeId_prefix,
final TimeValue resolveTimeout) throws InterruptedException {
Objects.requireNonNull(executorService);
Objects.requireNonNull(logger);
Objects.requireNonNull(hosts);
Objects.requireNonNull(transportService);
Objects.requireNonNull(nodeId_prefix);
Objects.requireNonNull(resolveTimeout);
if (resolveTimeout.nanos() < 0) {
throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]");
@ -213,7 +207,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
.collect(Collectors.toList());
final List<Future<TransportAddress[]>> futures =
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
final List<DiscoveryNode> discoveryNodes = new ArrayList<>();
final List<TransportAddress> transportAddresses = new ArrayList<>();
final Set<TransportAddress> localAddresses = new HashSet<>();
localAddresses.add(transportService.boundAddress().publishAddress());
localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
@ -231,13 +225,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final TransportAddress address = addresses[addressId];
// no point in pinging ourselves
if (localAddresses.contains(address) == false) {
discoveryNodes.add(
new DiscoveryNode(
nodeId_prefix + hostname + "_" + addressId + "#",
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
transportAddresses.add(address);
}
}
} catch (final ExecutionException e) {
@ -249,7 +237,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
}
}
return discoveryNodes;
return Collections.unmodifiableList(transportAddresses);
}
@Override
@ -292,29 +280,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
protected void ping(final Consumer<PingCollection> resultsConsumer,
final TimeValue scheduleDuration,
final TimeValue requestDuration) {
final List<DiscoveryNode> seedNodes;
final List<TransportAddress> seedAddresses = new ArrayList<>();
try {
seedNodes = resolveHostsLists(
seedAddresses.addAll(resolveHostsLists(
unicastZenPingExecutorService,
logger,
configuredHosts,
limitPortCounts,
transportService,
UNICAST_NODE_PREFIX,
resolveTimeout);
resolveTimeout));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
seedNodes.addAll(hostsProvider.buildDynamicNodes());
seedAddresses.addAll(hostsProvider.buildDynamicHosts());
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : nodes.getMasterNodes().values()) {
seedNodes.add(masterNode.value);
seedAddresses.add(masterNode.value.getAddress());
}
final ConnectionProfile connectionProfile =
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer,
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedAddresses, resultsConsumer,
nodes.getLocalNode(), connectionProfile);
activePingingRounds.put(pingingRound.id(), pingingRound);
final AbstractRunnable pingSender = new AbstractRunnable() {
@ -356,17 +343,17 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
private final Map<TransportAddress, Connection> tempConnections = new HashMap<>();
private final KeyedLock<TransportAddress> connectionLock = new KeyedLock<>(true);
private final PingCollection pingCollection;
private final List<DiscoveryNode> seedNodes;
private final List<TransportAddress> seedAddresses;
private final Consumer<PingCollection> pingListener;
private final DiscoveryNode localNode;
private final ConnectionProfile connectionProfile;
private AtomicBoolean closed = new AtomicBoolean(false);
PingingRound(int id, List<DiscoveryNode> seedNodes, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
PingingRound(int id, List<TransportAddress> seedAddresses, Consumer<PingCollection> resultsConsumer, DiscoveryNode localNode,
ConnectionProfile connectionProfile) {
this.id = id;
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
this.seedAddresses = Collections.unmodifiableList(seedAddresses.stream().distinct().collect(Collectors.toList()));
this.pingListener = resultsConsumer;
this.localNode = localNode;
this.connectionProfile = connectionProfile;
@ -381,9 +368,9 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
return this.closed.get();
}
public List<DiscoveryNode> getSeedNodes() {
public List<TransportAddress> getSeedAddresses() {
ensureOpen();
return seedNodes;
return seedAddresses;
}
public Connection getOrConnect(DiscoveryNode node) throws IOException {
@ -457,26 +444,28 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
final ClusterState lastState = contextProvider.clusterState();
final UnicastPingRequest pingRequest = new UnicastPingRequest(pingingRound.id(), timeout, createPingResponse(lastState));
Set<DiscoveryNode> nodesFromResponses = temporalResponses.stream().map(pingResponse -> {
List<TransportAddress> temporalAddresses = temporalResponses.stream().map(pingResponse -> {
assert clusterName.equals(pingResponse.clusterName()) :
"got a ping request from a different cluster. expected " + clusterName + " got " + pingResponse.clusterName();
return pingResponse.node();
}).collect(Collectors.toSet());
// dedup by address
final Map<TransportAddress, DiscoveryNode> uniqueNodesByAddress =
Stream.concat(pingingRound.getSeedNodes().stream(), nodesFromResponses.stream())
.collect(Collectors.toMap(DiscoveryNode::getAddress, Function.identity(), (n1, n2) -> n1));
return pingResponse.node().getAddress();
}).collect(Collectors.toList());
final Stream<TransportAddress> uniqueAddresses = Stream.concat(pingingRound.getSeedAddresses().stream(),
temporalAddresses.stream()).distinct();
// resolve what we can via the latest cluster state
final Set<DiscoveryNode> nodesToPing = uniqueNodesByAddress.values().stream()
.map(node -> {
DiscoveryNode foundNode = lastState.nodes().findByAddress(node.getAddress());
if (foundNode == null) {
return node;
} else {
final Set<DiscoveryNode> nodesToPing = uniqueAddresses
.map(address -> {
DiscoveryNode foundNode = lastState.nodes().findByAddress(address);
if (foundNode != null && transportService.nodeConnected(foundNode)) {
return foundNode;
} else {
return new DiscoveryNode(
address.toString(),
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion());
}
}).collect(Collectors.toSet());

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -53,6 +54,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
@ -74,9 +76,10 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final String[] bindHosts;
private final String[] publishHosts;
protected final AtomicLong totalChannelsAccepted = new AtomicLong();
protected final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected volatile BoundTransportAddress boundAddress;
private volatile BoundTransportAddress boundAddress;
private final AtomicLong totalChannelsAccepted = new AtomicLong();
private final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
@ -116,7 +119,12 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
return new HttpInfo(boundTransportAddress, maxContentLength.getBytes());
}
protected BoundTransportAddress createBoundHttpAddress() {
@Override
public HttpStats stats() {
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
protected void bindServer() {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
try {
@ -138,11 +146,71 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
}
final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress);
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);
return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), new TransportAddress(publishAddress));
TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress);
logger.info("{}", boundAddress);
}
protected abstract TransportAddress bindAddress(InetAddress hostAddress);
private TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (httpServerChannels) {
HttpServerChannel httpServerChannel = bind(new InetSocketAddress(hostAddress, portNumber));
httpServerChannels.add(httpServerChannel);
boundSocket.set(httpServerChannel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
}
protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception;
@Override
protected void doStop() {
synchronized (httpServerChannels) {
if (httpServerChannels.isEmpty() == false) {
try {
CloseableChannel.closeChannels(new ArrayList<>(httpServerChannels), true);
} catch (Exception e) {
logger.warn("exception while closing channels", e);
} finally {
httpServerChannels.clear();
}
}
}
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
stopInternal();
}
@Override
protected void doClose() {
}
/**
* Called to tear down internal resources
*/
protected abstract void stopInternal();
// package private for tests
static int resolvePublishPort(Settings settings, List<TransportAddress> boundAddresses, InetAddress publishInetAddress) {
@ -197,19 +265,23 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
CloseableChannel.closeChannel(channel);
} else {
logger.warn(() -> new ParameterizedMessage(
"caught exception while handling client http traffic, closing connection {}", channel), e);
"caught exception while handling client http traffic, closing connection {}", channel), e);
CloseableChannel.closeChannel(channel);
}
}
protected void onServerException(HttpServerChannel channel, Exception e) {
logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
* @param exception the exception
*/
protected void onNonChannelException(Exception exception) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()),
exception);
String threadName = Thread.currentThread().getName();
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception);
}
protected void serverAcceptedChannel(HttpChannel httpChannel) {

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.elasticsearch.common.network.CloseableChannel;
import java.net.InetSocketAddress;
public interface HttpServerChannel extends CloseableChannel {
/**
* Returns the local address for this channel.
*
* @return the local address of this channel.
*/
InetSocketAddress getLocalAddress();
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
@ -69,15 +68,11 @@ public class TaskResultsService extends AbstractComponent {
private final ClusterService clusterService;
private final TransportCreateIndexAction createIndexAction;
@Inject
public TaskResultsService(Settings settings, Client client, ClusterService clusterService,
TransportCreateIndexAction createIndexAction) {
public TaskResultsService(Settings settings, Client client, ClusterService clusterService) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.createIndexAction = createIndexAction;
}
public void storeResult(TaskResult taskResult, ActionListener<Void> listener) {
@ -91,7 +86,7 @@ public class TaskResultsService extends AbstractComponent {
createIndexRequest.mapping(TASK_TYPE, taskResultIndexMapping(), XContentType.JSON);
createIndexRequest.cause("auto(task api)");
createIndexAction.execute(null, createIndexRequest, new ActionListener<CreateIndexResponse>() {
client.admin().indices().create(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
doStoreResult(taskResult, listener);

View File

@ -22,7 +22,6 @@ import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
@ -43,8 +42,7 @@ final class RemoteClusterAwareClient extends AbstractClient {
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response>>
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.network.CloseableChannel;
import java.net.InetSocketAddress;
/**
* This is a tcp channel representing a server channel listening for new connections. It is the server
* channel abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport
* implementations must return server channels that adhere to the required method contracts.
*/
public interface TcpServerChannel extends CloseableChannel {
/**
* This returns the profile for this channel.
*/
String getProfile();
/**
* Returns the local address for this channel.
*
* @return the local address of this channel.
*/
InetSocketAddress getLocalAddress();
}

View File

@ -21,9 +21,6 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
@ -31,6 +28,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -52,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
@ -68,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.rest.RestStatus;
@ -210,7 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, List<TcpChannel>> serverChannels = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final KeyedLock<String> connectionLock = new KeyedLock<>();
@ -792,9 +792,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = portsRange.iterate(portNumber -> {
try {
TcpChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
synchronized (serverChannels) {
List<TcpChannel> list = serverChannels.get(name);
List<TcpServerChannel> list = serverChannels.get(name);
if (list == null) {
list = new ArrayList<>();
serverChannels.put(name, list);
@ -957,9 +957,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
closeLock.writeLock().lock();
try {
// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<TcpChannel>> entry : serverChannels.entrySet()) {
for (Map.Entry<String, List<TcpServerChannel>> entry : serverChannels.entrySet()) {
String profile = entry.getKey();
List<TcpChannel> channels = entry.getValue();
List<TcpServerChannel> channels = entry.getValue();
ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {},
e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e));
channels.forEach(c -> c.addCloseListener(closeFailLogger));
@ -999,7 +999,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
protected void onException(TcpChannel channel, Exception e) {
public void onException(TcpChannel channel, Exception e) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
@ -1049,6 +1049,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
protected void onServerException(TcpServerChannel channel, Exception e) {
logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
@ -1072,7 +1076,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
* @param name the profile name
* @param address the address to bind to
*/
protected abstract TcpChannel bind(String name, InetSocketAddress address) throws IOException;
protected abstract TcpServerChannel bind(String name, InetSocketAddress address) throws IOException;
/**
* Initiate a single tcp socket channel.
@ -1087,8 +1091,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
/**
* Called to tear down internal resources
*/
protected void stopInternal() {
}
protected abstract void stopInternal();
public boolean canCompress(TransportRequest request) {
return compress && (!(request instanceof BytesTransportRequest));

View File

@ -21,16 +21,17 @@
package org.elasticsearch.action.bulk;
import org.apache.lucene.util.Constants;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
@ -99,14 +100,13 @@ public class TransportBulkActionTookTests extends ESTestCase {
IndexNameExpressionResolver resolver = new Resolver(Settings.EMPTY);
ActionFilters actionFilters = new ActionFilters(new HashSet<>());
TransportCreateIndexAction createIndexAction = new TransportCreateIndexAction(
Settings.EMPTY,
transportService,
clusterService,
threadPool,
null,
actionFilters,
resolver);
NodeClient client = new NodeClient(Settings.EMPTY, threadPool) {
@Override
public <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
listener.onResponse((Response)new CreateIndexResponse());
}
};
if (controlled) {
@ -116,7 +116,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
transportService,
clusterService,
null,
createIndexAction,
client,
actionFilters,
resolver,
null,
@ -141,7 +141,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
transportService,
clusterService,
null,
createIndexAction,
client,
actionFilters,
resolver,
null,
@ -223,7 +223,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
TransportService transportService,
ClusterService clusterService,
TransportShardBulkAction shardBulkAction,
TransportCreateIndexAction createIndexAction,
NodeClient client,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
@ -235,7 +235,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
clusterService,
null,
shardBulkAction,
createIndexAction,
client,
actionFilters,
indexNameExpressionResolver,
autoCreateIndex,
@ -253,24 +253,4 @@ public class TransportBulkActionTookTests extends ESTestCase {
}
}
static class TestTransportCreateIndexAction extends TransportCreateIndexAction {
TestTransportCreateIndexAction(
Settings settings,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
MetaDataCreateIndexService createIndexService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doExecute(Task task, CreateIndexRequest request, ActionListener<CreateIndexResponse> listener) {
listener.onResponse(newResponse());
}
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -148,10 +148,9 @@ public class MultiSearchActionTookTests extends ESTestCase {
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>(Settings.EMPTY,
"action", threadPool, actionFilters, taskManager) {
NodeClient client = new NodeClient(settings, threadPool) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
requests.add(request);
commonExecutor.execute(() -> {
counter.decrementAndGet();
@ -161,8 +160,8 @@ public class MultiSearchActionTookTests extends ESTestCase {
};
if (controlledClock) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction,
availableProcessors, expected::get) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, availableProcessors,
expected::get, client) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener, long startTimeInNanos) {
@ -171,9 +170,8 @@ public class MultiSearchActionTookTests extends ESTestCase {
}
};
} else {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction,
availableProcessors, System::nanoTime) {
return new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService,
availableProcessors, System::nanoTime, client) {
@Override
void executeSearch(final Queue<SearchRequestSlot> requests, final AtomicArray<MultiSearchResponse.Item> responses,
final AtomicInteger responseCounter, final ActionListener<MultiSearchResponse> listener, long startTimeInNanos) {

View File

@ -24,7 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -107,15 +107,14 @@ public class TransportMultiSearchActionTests extends ESTestCase {
final ExecutorService commonExecutor = threadPool.executor(threadPoolNames.get(0));
final ExecutorService rarelyExecutor = threadPool.executor(threadPoolNames.get(1));
final Set<SearchRequest> requests = Collections.newSetFromMap(Collections.synchronizedMap(new IdentityHashMap<>()));
TransportAction<SearchRequest, SearchResponse> searchAction = new TransportAction<SearchRequest, SearchResponse>
(Settings.EMPTY, "action", threadPool, actionFilters, taskManager) {
NodeClient client = new NodeClient(settings, threadPool) {
@Override
protected void doExecute(SearchRequest request, ActionListener<SearchResponse> listener) {
public void search(final SearchRequest request, final ActionListener<SearchResponse> listener) {
requests.add(request);
int currentConcurrentSearches = counter.incrementAndGet();
if (currentConcurrentSearches > maxAllowedConcurrentSearches) {
errorHolder.set(new AssertionError("Current concurrent search [" + currentConcurrentSearches +
"] is higher than is allowed [" + maxAllowedConcurrentSearches + "]"));
"] is higher than is allowed [" + maxAllowedConcurrentSearches + "]"));
}
final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor;
executorService.execute(() -> {
@ -126,8 +125,7 @@ public class TransportMultiSearchActionTests extends ESTestCase {
};
TransportMultiSearchAction action =
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, searchAction, 10,
System::nanoTime);
new TransportMultiSearchAction(threadPool, actionFilters, transportService, clusterService, 10, System::nanoTime, client);
// Execute the multi search api and fail if we find an error after executing:
try {

View File

@ -22,7 +22,6 @@ package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
@ -38,11 +37,8 @@ public class ParentTaskAssigningClientTests extends ESTestCase {
// This mock will do nothing but verify that parentTaskId is set on all requests sent to it.
NoOpClient mock = new NoOpClient(getTestName()) {
@Override
protected < Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>
> void doExecute(Action<Response> action, Request request,
ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
assertEquals(parentTaskId[0], request.getParentTask());
super.doExecute(action, request, listener);
}

View File

@ -84,7 +84,7 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase {
internalCluster().getInstance(TransportService.class);
// try to ping the single node directly
final UnicastHostsProvider provider =
() -> Collections.singletonList(nodeTransport.getLocalNode());
() -> Collections.singletonList(nodeTransport.getLocalNode().getAddress());
final CountDownLatch latch = new CountDownLatch(1);
final DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(nodeTransport.getLocalNode())

View File

@ -408,19 +408,18 @@ public class UnicastZenPingTests extends ESTestCase {
Collections.emptySet());
closeables.push(transportService);
final int limitPortCounts = randomIntBetween(1, 10);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
executorService,
logger,
Collections.singletonList("127.0.0.1"),
limitPortCounts,
transportService,
"test_",
TimeValue.timeValueSeconds(1));
assertThat(discoveryNodes, hasSize(limitPortCounts));
assertThat(transportAddresses, hasSize(limitPortCounts));
final Set<Integer> ports = new HashSet<>();
for (final DiscoveryNode discoveryNode : discoveryNodes) {
assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress());
ports.add(discoveryNode.getAddress().getPort());
for (final TransportAddress address : transportAddresses) {
assertTrue(address.address().getAddress().isLoopbackAddress());
ports.add(address.getPort());
}
assertThat(ports, equalTo(IntStream.range(9300, 9300 + limitPortCounts).mapToObj(m -> m).collect(Collectors.toSet())));
}
@ -453,19 +452,18 @@ public class UnicastZenPingTests extends ESTestCase {
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
Collections.emptySet());
closeables.push(transportService);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
executorService,
logger,
Collections.singletonList(NetworkAddress.format(loopbackAddress)),
10,
transportService,
"test_",
TimeValue.timeValueSeconds(1));
assertThat(discoveryNodes, hasSize(7));
assertThat(transportAddresses, hasSize(7));
final Set<Integer> ports = new HashSet<>();
for (final DiscoveryNode discoveryNode : discoveryNodes) {
assertTrue(discoveryNode.getAddress().address().getAddress().isLoopbackAddress());
ports.add(discoveryNode.getAddress().getPort());
for (final TransportAddress address : transportAddresses) {
assertTrue(address.address().getAddress().isLoopbackAddress());
ports.add(address.getPort());
}
assertThat(ports, equalTo(IntStream.range(9303, 9310).mapToObj(m -> m).collect(Collectors.toSet())));
}
@ -505,17 +503,16 @@ public class UnicastZenPingTests extends ESTestCase {
Collections.emptySet());
closeables.push(transportService);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
executorService,
logger,
Arrays.asList(hostname),
1,
transportService,
"test_",
TimeValue.timeValueSeconds(1)
);
assertThat(discoveryNodes, empty());
assertThat(transportAddresses, empty());
verify(logger).warn("failed to resolve host [" + hostname + "]", unknownHostException);
}
@ -565,16 +562,15 @@ public class UnicastZenPingTests extends ESTestCase {
closeables.push(transportService);
final TimeValue resolveTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 3));
try {
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
executorService,
logger,
Arrays.asList("hostname1", "hostname2"),
1,
transportService,
"test+",
resolveTimeout);
assertThat(discoveryNodes, hasSize(1));
assertThat(transportAddresses, hasSize(1));
verify(logger).trace(
"resolved host [{}] to {}", "hostname1",
new TransportAddress[]{new TransportAddress(TransportAddress.META_ADDRESS, 9300)});
@ -732,17 +728,16 @@ public class UnicastZenPingTests extends ESTestCase {
new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> null, null,
Collections.emptySet());
closeables.push(transportService);
final List<DiscoveryNode> discoveryNodes = TestUnicastZenPing.resolveHostsLists(
final List<TransportAddress> transportAddresses = TestUnicastZenPing.resolveHostsLists(
executorService,
logger,
Arrays.asList("127.0.0.1:9300:9300", "127.0.0.1:9301"),
1,
transportService,
"test_",
TimeValue.timeValueSeconds(1));
assertThat(discoveryNodes, hasSize(1)); // only one of the two is valid and will be used
assertThat(discoveryNodes.get(0).getAddress().getAddress(), equalTo("127.0.0.1"));
assertThat(discoveryNodes.get(0).getAddress().getPort(), equalTo(9301));
assertThat(transportAddresses, hasSize(1)); // only one of the two is valid and will be used
assertThat(transportAddresses.get(0).getAddress(), equalTo("127.0.0.1"));
assertThat(transportAddresses.get(0).getPort(), equalTo(9301));
verify(logger).warn(eq("failed to resolve host [127.0.0.1:9300:9300]"), Matchers.any(ExecutionException.class));
}

View File

@ -35,8 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@ -128,8 +127,9 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
try (AbstractHttpServerTransport transport =
new AbstractHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher) {
@Override
protected TransportAddress bindAddress(InetAddress hostAddress) {
protected HttpServerChannel bind(InetSocketAddress hostAddress) {
return null;
}
@ -139,12 +139,7 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() throws IOException {
protected void stopInternal() {
}

View File

@ -193,6 +193,10 @@ public class TcpTransportTests extends ESTestCase {
return new FakeChannel(messageCaptor);
}
@Override
protected void stopInternal() {
}
@Override
public NodeChannels getConnection(DiscoveryNode node) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
@ -237,7 +241,7 @@ public class TcpTransportTests extends ESTestCase {
}
}
private static final class FakeChannel implements TcpChannel {
private static final class FakeChannel implements TcpChannel, TcpServerChannel {
private final AtomicReference<BytesReference> messageCaptor;

View File

@ -23,7 +23,6 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.common.settings.Settings;
@ -51,10 +50,8 @@ public class NoOpClient extends AbstractClient {
}
@Override
protected <Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
listener.onResponse(null);
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.test.discovery;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
@ -55,7 +56,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos
}
@Override
public List<DiscoveryNode> buildDynamicNodes() {
public List<TransportAddress> buildDynamicHosts() {
final DiscoveryNode localNode = getNode();
assert localNode != null;
synchronized (activeNodesPerCluster) {
@ -64,6 +65,7 @@ public final class MockUncasedHostProvider implements UnicastHostsProvider, Clos
.map(MockUncasedHostProvider::getNode)
.filter(Objects::nonNull)
.filter(n -> localNode.equals(n) == false)
.map(DiscoveryNode::getAddress)
.collect(Collectors.toList());
}
}

View File

@ -225,7 +225,7 @@ public class MockTcpTransport extends TcpTransport {
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}
public final class MockChannel implements Closeable, TcpChannel {
public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
private final ServerSocket serverSocket;

View File

@ -41,6 +41,7 @@ import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
@ -164,7 +165,7 @@ public class MockNioTransport extends TcpTransport {
@Override
public MockServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel, this, selector);
MockServerChannel nioServerChannel = new MockServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
ServerChannelContext context = new ServerChannelContext(nioServerChannel, this, selector, MockNioTransport.this::acceptChannel,
@ -191,12 +192,11 @@ public class MockNioTransport extends TcpTransport {
}
}
private static class MockServerChannel extends NioServerSocketChannel implements TcpChannel {
private static class MockServerChannel extends NioServerSocketChannel implements TcpServerChannel {
private final String profile;
MockServerChannel(String profile, ServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, NioSelector selector)
throws IOException {
MockServerChannel(String profile, ServerSocketChannel channel) {
super(channel);
this.profile = profile;
}
@ -215,29 +215,13 @@ public class MockNioTransport extends TcpTransport {
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override
public void setSoLinger(int value) throws IOException {
throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel.");
}
@Override
public InetSocketAddress getRemoteAddress() {
return null;
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
throw new UnsupportedOperationException("Cannot send a message to a server channel.");
}
}
private static class MockSocketChannel extends NioSocketChannel implements TcpChannel {
private final String profile;
private MockSocketChannel(String profile, java.nio.channels.SocketChannel socketChannel, NioSelector selector)
throws IOException {
private MockSocketChannel(String profile, java.nio.channels.SocketChannel socketChannel, NioSelector selector) {
super(socketChannel);
this.profile = profile;
}

View File

@ -139,10 +139,9 @@ public final class ClientHelper {
* @param listener
* The listener to call when the action is complete
*/
public static <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>> void executeWithHeadersAsync(
Map<String, String> headers, String origin, Client client, Action<Response> action, Request request,
ActionListener<Response> listener) {
public static <Request extends ActionRequest, Response extends ActionResponse>
void executeWithHeadersAsync(Map<String, String> headers, String origin, Client client, Action<Response> action, Request request,
ActionListener<Response> listener) {
Map<String, String> filteredHeaders = headers.entrySet().stream().filter(e -> SECURITY_HEADER_FILTERS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
@ -177,9 +176,8 @@ public final class ClientHelper {
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>> void doExecute(
Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Supplier<ThreadContext.StoredContext> supplier = in().threadPool().getThreadContext().newRestorableContext(false);
try (ThreadContext.StoredContext ignore = in().threadPool().getThreadContext().stashContext()) {
in().threadPool().getThreadContext().putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin);

View File

@ -109,7 +109,7 @@ public class SecurityNetty4Transport extends Netty4Transport {
}
@Override
protected void onException(TcpChannel channel, Exception e) {
public void onException(TcpChannel channel, Exception e) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);

View File

@ -13,9 +13,9 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -65,7 +65,7 @@ import java.util.function.Supplier;
*/
public class TransportGraphExploreAction extends HandledTransportAction<GraphExploreRequest, GraphExploreResponse> {
private final TransportSearchAction searchAction;
private final NodeClient client;
protected final XPackLicenseState licenseState;
static class VertexPriorityQueue extends PriorityQueue<Vertex> {
@ -82,12 +82,12 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
}
@Inject
public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, TransportSearchAction transportSearchAction,
public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, NodeClient client,
TransportService transportService, ActionFilters actionFilters,
XPackLicenseState licenseState) {
super(settings, GraphExploreAction.NAME, threadPool, transportService, actionFilters,
(Supplier<GraphExploreRequest>)GraphExploreRequest::new);
this.searchAction = transportSearchAction;
this.client = client;
this.licenseState = licenseState;
}
@ -313,7 +313,7 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
// System.out.println(source);
logger.trace("executing expansion graph search request");
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
// System.out.println(searchResponse);
@ -660,7 +660,7 @@ public class TransportGraphExploreAction extends HandledTransportAction<GraphExp
searchRequest.source(source);
// System.out.println(source);
logger.trace("executing initial graph search request");
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
addShardFailures(searchResponse.getShardFailures());

View File

@ -24,7 +24,7 @@ import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.NettyTcpChannel;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.user.KibanaUser;
@ -116,8 +116,8 @@ public interface ServerTransportFilter {
}
if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel) &&
((TcpTransportChannel) unwrappedChannel).getChannel() instanceof NettyTcpChannel) {
Channel channel = ((NettyTcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel();
((TcpTransportChannel) unwrappedChannel).getChannel() instanceof Netty4TcpChannel) {
Channel channel = ((Netty4TcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel();
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
if (channel.isOpen()) {
assert sslHandler != null : "channel [" + channel + "] did not have a ssl handler. pipeline " + channel.pipeline();

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.security.transport.nio;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
@ -131,9 +130,8 @@ public class SecurityNioTransport extends NioTransport {
@Override
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);;
Consumer<Exception> exceptionHandler = (e) -> onServerException(nioChannel, e);
Consumer<NioSocketChannel> acceptor = SecurityNioTransport.this::acceptChannel;
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);

View File

@ -9,7 +9,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
@ -123,9 +122,7 @@ public class TransportSamlInvalidateSessionActionTests extends SamlTestCase {
searchRequests = new ArrayList<>();
final Client client = new NoOpClient(threadPool) {
@Override
protected <Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>>
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
if (IndexAction.NAME.equals(action.name())) {
assertThat(request, instanceOf(IndexRequest.class));

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.security.audit.index;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
@ -70,9 +69,8 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
}
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response>> void doExecute(
Action<Response> action, Request request, ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
clientCalled.set(true);
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.security.authc.esnative;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
@ -72,14 +71,8 @@ public class NativeUsersStoreTests extends ESTestCase {
client = new FilterClient(mockClient) {
@Override
protected <
Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>
> void doExecute(
Action<Response> action,
Request request,
ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
requests.add(new Tuple<>(request, listener));
}
};

View File

@ -19,7 +19,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.client.Client;
@ -80,11 +79,8 @@ public class SecurityIndexManagerTests extends ESTestCase {
actions = new LinkedHashMap<>();
final Client client = new FilterClient(mockClient) {
@Override
protected <Request extends ActionRequest,
Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response>>
void doExecute(Action<Response> action, Request request,
ActionListener<Response> listener) {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
final Map<ActionRequest, ActionListener<?>> map = actions.getOrDefault(action, new HashMap<>());
map.put(request, listener);
actions.put(action, map);