Merge branch 'master' into index-lifecycle

This commit is contained in:
Tal Levy 2018-08-13 16:31:40 -07:00
commit e78f537e58
55 changed files with 1692 additions and 1234 deletions

View File

@ -147,7 +147,7 @@ public class ExpressionScriptEngine extends AbstractComponent implements ScriptE
}
return new BucketAggregationScript(parameters) {
@Override
public double execute() {
public Double execute() {
getParams().forEach((name, value) -> {
ReplaceableConstDoubleValues placeholder = functionValuesMap.get(name);
if (placeholder == null) {

View File

@ -267,8 +267,12 @@ public class Netty4Transport extends TcpTransport {
return esChannel;
}
ScheduledPing getPing() {
return scheduledPing;
long successfulPingCount() {
return successfulPings.count();
}
long failedPingCount() {
return failedPings.count();
}
@Override

View File

@ -26,16 +26,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
@ -83,23 +80,20 @@ public class Netty4ScheduledPingTests extends ESTestCase {
serviceB.connectToNode(nodeA);
assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(100L));
assertThat(nettyA.successfulPingCount(), greaterThan(100L));
assertThat(nettyB.successfulPingCount(), greaterThan(100L));
});
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyA.failedPingCount(), equalTo(0L));
assertThat(nettyB.failedPingCount(), equalTo(0L));
serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
(request, channel, task) -> {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {
logger.error("Unexpected failure", e);
fail(e.getMessage());
}
}
});
int rounds = scaledRandomIntBetween(100, 5000);
@ -130,11 +124,11 @@ public class Netty4ScheduledPingTests extends ESTestCase {
}
assertBusy(() -> {
assertThat(nettyA.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyB.getPing().getSuccessfulPings(), greaterThan(200L));
assertThat(nettyA.successfulPingCount(), greaterThan(200L));
assertThat(nettyB.successfulPingCount(), greaterThan(200L));
});
assertThat(nettyA.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyB.getPing().getFailedPings(), equalTo(0L));
assertThat(nettyA.failedPingCount(), equalTo(0L));
assertThat(nettyB.failedPingCount(), equalTo(0L));
Releasables.close(serviceA, serviceB);
terminate(threadPool);

View File

@ -84,7 +84,7 @@ task deploy(type: Copy) {
}
task writeElasticsearchProperties {
onlyIf { !Os.isFamily(Os.FAMILY_WINDOWS) && !inFipsJvm }
onlyIf { !Os.isFamily(Os.FAMILY_WINDOWS) }
dependsOn 'integTestCluster#wait', deploy
doLast {
final File elasticsearchProperties = file("${wildflyInstall}/standalone/configuration/elasticsearch.properties")
@ -178,7 +178,7 @@ task stopWildfly(type: LoggedExec) {
commandLine "${wildflyInstall}/bin/jboss-cli.sh", "--controller=localhost:${-> managementPort}", "--connect", "command=shutdown"
}
if (!Os.isFamily(Os.FAMILY_WINDOWS) && !inFipsJvm) {
if (!Os.isFamily(Os.FAMILY_WINDOWS)) {
integTestRunner.dependsOn(configureTransportClient)
final TaskExecutionAdapter logDumpListener = new TaskExecutionAdapter() {
@Override

View File

@ -28,6 +28,7 @@ import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestRuleLimitSysouts;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
@ -50,6 +51,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@TestRuleLimitSysouts.Limit(bytes = 14000)
public class WildflyIT extends LuceneTestCase {
public void testTransportClient() throws URISyntaxException, IOException {

View File

@ -101,7 +101,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
}
@Override
protected void doRun() throws Exception {
protected void doRun() {
try (Releasable ignored = nodeLocks.acquire(node)) {
validateAndConnectIfNeeded(node);
}

View File

@ -46,7 +46,7 @@ public abstract class BucketAggregationScript {
return params;
}
public abstract double execute();
public abstract Double execute();
public interface Factory {
BucketAggregationScript newInstance(Map<String, Object> params);

View File

@ -110,7 +110,10 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
if (skipBucket) {
newBuckets.add(bucket);
} else {
double returned = factory.newInstance(vars).execute();
Double returned = factory.newInstance(vars).execute();
if (returned == null) {
newBuckets.add(bucket);
} else {
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map(
(p) -> (InternalAggregation) p).collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), returned, formatter, new ArrayList<>(), metaData()));
@ -119,6 +122,7 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
newBuckets.add(newBucket);
}
}
}
return originalAgg.create(newBuckets);
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
/**
* Abstract Transport.Connection that provides common close logic.
*/
public abstract class CloseableConnection implements Transport.Connection {
private final CompletableContext<Void> closeContext = new CompletableContext<>();
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isClosed() {
return closeContext.isDone();
}
@Override
public void close() {
// This method is safe to call multiple times as the close context will provide concurrency
// protection and only be completed once. The attached listeners will only be notified once.
closeContext.complete(null);
}
}

View File

@ -0,0 +1,282 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This class manages node connections. The connection is opened by the underlying transport. Once the
* connection is opened, this class manages the connection. This includes keep-alive pings and closing
* the connection when the connection manager is closed.
*/
public class ConnectionManager implements Closeable {
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final Logger logger;
private final Transport transport;
private final ThreadPool threadPool;
private final TimeValue pingSchedule;
private final Lifecycle lifecycle = new Lifecycle();
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();
public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this.logger = Loggers.getLogger(getClass(), settings);
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.lifecycle.moveToStarted();
if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
}
}
public void addListener(TransportConnectionListener listener) {
this.connectionListener.listeners.add(listener);
}
public void removeListener(TransportConnectionListener listener) {
this.connectionListener.listeners.remove(listener);
}
/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
*/
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
Transport.Connection connection = connectedNodes.get(node);
if (connection != null) {
return;
}
boolean success = false;
try {
connection = transport.openConnection(node, connectionProfile);
connectionValidator.accept(connection, connectionProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, connection);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
try {
connectionListener.onNodeConnected(node);
} finally {
final Transport.Connection finalConnection = connection;
connection.addCloseListener(ActionListener.wrap(() -> {
connectedNodes.remove(node, finalConnection);
connectionListener.onNodeDisconnected(node);
}));
}
if (connection.isClosed()) {
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
success = true;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) { // close the connection if there is a failure
logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node));
IOUtils.closeWhileHandlingException(connection);
}
}
}
} finally {
closeLock.readLock().unlock();
}
}
/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecycle of this connection is
* maintained by this connection manager
*
* @throws NodeNotConnectedException if the node is not connected
* @see #connectToNode(DiscoveryNode, ConnectionProfile, CheckedBiConsumer)
*/
public Transport.Connection getConnection(DiscoveryNode node) {
Transport.Connection connection = connectedNodes.get(node);
if (connection == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return connection;
}
/**
* Returns {@code true} if the node is connected.
*/
public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node);
}
/**
* Disconnected from the given node, if not connected, will do nothing.
*/
public void disconnectFromNode(DiscoveryNode node) {
Transport.Connection nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
// if we found it and removed it we close
nodeChannels.close();
}
}
public int connectedNodeCount() {
return connectedNodes.size();
}
@Override
public void close() {
lifecycle.moveToStopped();
CountDownLatch latch = new CountDownLatch(1);
// TODO: Consider moving all read/write lock (in Transport and this class) to the TransportService
threadPool.generic().execute(() -> {
closeLock.writeLock().lock();
try {
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
} finally {
closeLock.writeLock().unlock();
latch.countDown();
}
});
try {
try {
latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
} finally {
lifecycle.moveToClosed();
}
}
private void ensureOpen() {
if (lifecycle.started() == false) {
throw new IllegalStateException("connection manager is closed");
}
}
private class ScheduledPing extends AbstractLifecycleRunnable {
private ScheduledPing() {
super(lifecycle, logger);
}
@Override
protected void doRunInLifecycle() {
for (Map.Entry<DiscoveryNode, Transport.Connection> entry : connectedNodes.entrySet()) {
Transport.Connection connection = entry.getValue();
if (connection.sendPing() == false) {
logger.warn("attempted to send ping to connection without support for pings [{}]", connection);
}
}
}
@Override
protected void onAfterInLifecycle() {
try {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this);
} catch (EsRejectedExecutionException ex) {
if (ex.isExecutorShutdown()) {
logger.debug("couldn't schedule new ping execution, executor is shutting down", ex);
} else {
throw ex;
}
}
}
@Override
public void onFailure(Exception e) {
if (lifecycle.stoppedOrClosed()) {
logger.trace("failed to send ping transport message", e);
} else {
logger.warn("failed to send ping transport message", e);
}
}
}
private static final class DelegatingNodeConnectionListener implements TransportConnectionListener {
private final List<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
@Override
public void onNodeDisconnected(DiscoveryNode key) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeDisconnected(key);
}
}
@Override
public void onNodeConnected(DiscoveryNode node) {
for (TransportConnectionListener listener : listeners) {
listener.onNodeConnected(node);
}
}
}
}

View File

@ -289,11 +289,26 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
TransportActionProxy.wrapRequest(targetNode, request), options);
}
@Override
public boolean sendPing() {
return proxyConnection.sendPing();
}
@Override
public void close() {
assert false: "proxy connections must not be closed";
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
proxyConnection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return proxyConnection.isClosed();
}
@Override
public Version getVersion() {
return proxyConnection.getVersion();

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
@ -48,7 +47,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
@ -63,10 +61,7 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -90,7 +85,6 @@ import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -190,6 +184,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// This is the number of bytes necessary to read the message size
public static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE;
public static final int PING_DATA_SIZE = -1;
protected final CounterMetric successfulPings = new CounterMetric();
protected final CounterMetric failedPings = new CounterMetric();
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final BytesReference EMPTY_BYTES_REFERENCE = new BytesArray(new byte[0]);
@ -198,9 +194,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final String[] features;
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
protected final ScheduledPing scheduledPing;
private final TimeValue pingSchedule;
protected final ThreadPool threadPool;
private final BigArrays bigArrays;
protected final NetworkService networkService;
@ -209,16 +202,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final DelegatingTransportConnectionListener transportListener = new DelegatingTransportConnectionListener();
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final KeyedLock<String> connectionLock = new KeyedLock<>();
private final NamedWriteableRegistry namedWriteableRegistry;
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
// connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
protected final boolean compress;
private volatile BoundTransportAddress boundAddress;
@ -233,6 +223,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final MeanMetric transmittedBytesMetric = new MeanMetric();
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private final BytesReference pingMessage;
public TcpTransport(String transportName, Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
@ -242,8 +233,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.threadPool = threadPool;
this.bigArrays = bigArrays;
this.circuitBreakerService = circuitBreakerService;
this.scheduledPing = new ScheduledPing();
this.pingSchedule = PING_SCHEDULE.get(settings);
this.namedWriteableRegistry = namedWriteableRegistry;
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
@ -261,6 +250,15 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// use a sorted set to present the features in a consistent order
this.features = new TreeSet<>(defaultFeatures.names()).toArray(new String[defaultFeatures.names().size()]);
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeByte((byte) 'E');
out.writeByte((byte) 'S');
out.writeInt(TcpTransport.PING_DATA_SIZE);
pingMessage = out.bytes();
} catch (IOException e) {
throw new AssertionError(e.getMessage(), e); // won't happen
}
}
static ConnectionProfile buildDefaultConnectionProfile(Settings settings) {
@ -284,9 +282,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
@Override
protected void doStart() {
if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing);
}
}
@Override
@ -348,92 +343,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
public class ScheduledPing extends AbstractLifecycleRunnable {
/**
* The magic number (must be lower than 0) for a ping message.
*/
private final BytesReference pingHeader;
final CounterMetric successfulPings = new CounterMetric();
final CounterMetric failedPings = new CounterMetric();
public ScheduledPing() {
super(lifecycle, logger);
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeByte((byte) 'E');
out.writeByte((byte) 'S');
out.writeInt(PING_DATA_SIZE);
pingHeader = out.bytes();
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e); // won't happen
}
}
@Override
protected void doRunInLifecycle() throws Exception {
for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
DiscoveryNode node = entry.getKey();
NodeChannels channels = entry.getValue();
for (TcpChannel channel : channels.getChannels()) {
internalSendMessage(channel, pingHeader, new SendMetricListener(pingHeader.length()) {
@Override
protected void innerInnerOnResponse(Void v) {
successfulPings.inc();
}
@Override
protected void innerOnFailure(Exception e) {
if (channel.isOpen()) {
logger.debug(() -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
failedPings.inc();
} else {
logger.trace(() ->
new ParameterizedMessage("[{}] failed to send ping transport message (channel closed)", node), e);
}
}
});
}
}
}
public long getSuccessfulPings() {
return successfulPings.count();
}
public long getFailedPings() {
return failedPings.count();
}
@Override
protected void onAfterInLifecycle() {
try {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, this);
} catch (EsRejectedExecutionException ex) {
if (ex.isExecutorShutdown()) {
logger.debug("couldn't schedule new ping execution, executor is shutting down", ex);
} else {
throw ex;
}
}
}
@Override
public void onFailure(Exception e) {
if (lifecycle.stoppedOrClosed()) {
logger.trace("failed to send ping transport message", e);
} else {
logger.warn("failed to send ping transport message", e);
}
}
}
public final class NodeChannels implements Connection {
public final class NodeChannels extends CloseableConnection {
private final Map<TransportRequestOptions.Type, ConnectionProfile.ConnectionTypeHandle> typeMapping;
private final List<TcpChannel> channels;
private final DiscoveryNode node;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Version version;
private final AtomicBoolean isClosing = new AtomicBoolean(false);
NodeChannels(DiscoveryNode node, List<TcpChannel> channels, ConnectionProfile connectionProfile, Version handshakeVersion) {
this.node = node;
@ -465,13 +380,38 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
return connectionTypeHandle.getChannel(channels);
}
public boolean allChannelsOpen() {
boolean allChannelsOpen() {
return channels.stream().allMatch(TcpChannel::isOpen);
}
@Override
public boolean sendPing() {
for (TcpChannel channel : channels) {
internalSendMessage(channel, pingMessage, new SendMetricListener(pingMessage.length()) {
@Override
protected void innerInnerOnResponse(Void v) {
successfulPings.inc();
}
@Override
protected void innerOnFailure(Exception e) {
if (channel.isOpen()) {
logger.debug(() -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
failedPings.inc();
} else {
logger.trace(() ->
new ParameterizedMessage("[{}] failed to send ping transport message (channel closed)", node), e);
}
}
});
}
return true;
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
if (isClosing.compareAndSet(false, true)) {
try {
if (lifecycle.stopped()) {
/* We set SO_LINGER timeout to 0 to ensure that when we shutdown the node we don't
@ -494,7 +434,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
CloseableChannel.closeChannels(channels, block);
} finally {
transportListener.onConnectionClosed(this);
// Call the super method to trigger listeners
super.close();
}
}
}
@ -507,81 +448,12 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (closed.get()) {
if (isClosing.get()) {
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
}
boolean isClosed() {
return closed.get();
}
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.containsKey(node);
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
connectionProfile = resolveConnectionProfile(connectionProfile);
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
closeLock.readLock().lock(); // ensure we don't open connections while we are closing
try {
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
boolean success = false;
try {
nodeChannels = openConnection(node, connectionProfile);
connectionValidator.accept(nodeChannels, connectionProfile);
// we acquire a connection lock, so no way there is an existing connection
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
try {
transportListener.onNodeConnected(node);
} finally {
if (nodeChannels.isClosed()) {
// we got closed concurrently due to a disconnect or some other event on the channel.
// the close callback will close the NodeChannel instance first and then try to remove
// the connection from the connected nodes. It will NOT acquire the connectionLock for
// the node to prevent any blocking calls on network threads. Yet, we still establish a happens
// before relationship to the connectedNodes.put since we check if we can remove the
// (DiscoveryNode, NodeChannels) tuple from the map after we closed. Here we check if it's closed an if so we
// try to remove it first either way one of the two wins even if the callback has run before we even added the
// tuple to the map since in that case we remove it here again
if (connectedNodes.remove(node, nodeChannels)) {
transportListener.onNodeDisconnected(node);
}
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
}
success = true;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) { // close the connection if there is a failure
logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node));
IOUtils.closeWhileHandlingException(nodeChannels);
}
}
}
} finally {
closeLock.readLock().unlock();
}
}
/**
@ -612,7 +484,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
@Override
public final NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (node == null) {
throw new ConnectTransportException(null, "can't open connection to a null node");
}
@ -664,16 +536,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// At this point we should construct the connection, notify the transport service, and attach close listeners to the
// underlying channels.
nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
transportListener.onConnectionOpened(nodeChannels);
final NodeChannels finalNodeChannels = nodeChannels;
final AtomicBoolean runOnce = new AtomicBoolean(false);
try {
transportListener.onConnectionOpened(nodeChannels);
} finally {
nodeChannels.addCloseListener(ActionListener.wrap(() -> transportListener.onConnectionClosed(finalNodeChannels)));
}
Consumer<TcpChannel> onClose = c -> {
assert c.isOpen() == false : "channel is still open when onClose is called";
// we only need to disconnect from the nodes once since all other channels
// will also try to run this we protect it from running multiple times.
if (runOnce.compareAndSet(false, true)) {
disconnectFromNodeCloseAndNotify(node, finalNodeChannels);
}
finalNodeChannels.close();
};
nodeChannels.channels.forEach(ch -> ch.addCloseListener(ActionListener.wrap(() -> onClose.accept(ch))));
@ -699,46 +571,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
private void disconnectFromNodeCloseAndNotify(DiscoveryNode node, NodeChannels nodeChannels) {
assert nodeChannels != null : "nodeChannels must not be null";
try {
IOUtils.closeWhileHandlingException(nodeChannels);
} finally {
if (closeLock.readLock().tryLock()) {
try {
if (connectedNodes.remove(node, nodeChannels)) {
transportListener.onNodeDisconnected(node);
}
} finally {
closeLock.readLock().unlock();
}
}
}
}
@Override
public NodeChannels getConnection(DiscoveryNode node) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return nodeChannels;
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
closeLock.readLock().lock();
NodeChannels nodeChannels = null;
try (Releasable ignored = connectionLock.acquire(node.getId())) {
nodeChannels = connectedNodes.remove(node);
} finally {
closeLock.readLock().unlock();
if (nodeChannels != null) { // if we found it and removed it we close and notify
IOUtils.closeWhileHandlingException(nodeChannels, () -> transportListener.onNodeDisconnected(node));
}
}
}
protected Version getCurrentVersion() {
// this is just for tests to mock stuff like the nodes version - tests can override this internally
return Version.CURRENT;
@ -983,19 +815,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
CloseableChannel.closeChannels(new ArrayList<>(acceptedChannels), true);
acceptedChannels.clear();
// we are holding a write lock so nobody modifies the connectedNodes / openConnections map - it's safe to first close
// all instances and then clear them maps
Iterator<Map.Entry<DiscoveryNode, NodeChannels>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, NodeChannels> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
transportListener.onNodeDisconnected(next.getKey());
} finally {
iterator.remove();
}
}
stopInternal();
} finally {
closeLock.writeLock().unlock();
@ -1111,8 +930,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
byte status) throws IOException,
TransportException {
byte status) throws IOException, TransportException {
if (compress) {
options = TransportRequestOptions.builder(options).withCompress(true).build();
}

View File

@ -20,8 +20,8 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.component.LifecycleComponent;
@ -85,23 +85,6 @@ public interface Transport extends LifecycleComponent {
*/
TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException;
/**
* Returns {@code true} if the node is connected.
*/
boolean nodeConnected(DiscoveryNode node);
/**
* Connects to a node with the given connection profile. If the node is already connected this method has no effect.
* Once a successful is established, it can be validated before being exposed.
*/
void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator) throws ConnectTransportException;
/**
* Disconnected from the given node, if not connected, will do nothing.
*/
void disconnectFromNode(DiscoveryNode node);
/**
* Returns a list of all local adresses for this transport
*/
@ -112,23 +95,10 @@ public interface Transport extends LifecycleComponent {
}
/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecycle of this connection is maintained by the Transport
* implementation.
*
* @throws NodeNotConnectedException if the node is not connected
* @see #connectToNode(DiscoveryNode, ConnectionProfile, CheckedBiConsumer)
*/
Connection getConnection(DiscoveryNode node);
/**
* Opens a new connection to the given node and returns it. In contrast to
* {@link #connectToNode(DiscoveryNode, ConnectionProfile, CheckedBiConsumer)} the returned connection is not managed by
* Opens a new connection to the given node and returns it. The returned connection is not managed by
* the transport implementation. This connection must be closed once it's not needed anymore.
* This connection type can be used to execute a handshake between two nodes before the node will be published via
* {@link #connectToNode(DiscoveryNode, ConnectionProfile, CheckedBiConsumer)}.
*/
Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException;
Connection openConnection(DiscoveryNode node, ConnectionProfile profile);
TransportStats getStats();
@ -154,6 +124,21 @@ public interface Transport extends LifecycleComponent {
void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws
IOException, TransportException;
default boolean sendPing() {
return false;
}
/**
* The listener's {@link ActionListener#onResponse(Object)} method will be called when this
* connection is closed. No implementations currently throw an exception during close, so
* {@link ActionListener#onFailure(Exception)} will not be called.
*
* @param listener to be called
*/
void addCloseListener(ActionListener<Void> listener);
boolean isClosed();
/**
* Returns the version of the node this connection was established with.
*/
@ -168,6 +153,9 @@ public interface Transport extends LifecycleComponent {
default Object getCacheKey() {
return this;
}
@Override
void close();
}
/**

View File

@ -21,11 +21,11 @@ package org.elasticsearch.transport;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
@ -47,6 +47,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskManager;
@ -79,6 +80,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
private final CountDownLatch blockIncomingRequestsLatch = new CountDownLatch(1);
protected final Transport transport;
protected final ConnectionManager connectionManager;
protected final ThreadPool threadPool;
protected final ClusterName clusterName;
protected final TaskManager taskManager;
@ -109,6 +111,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
Function.identity(), Property.Dynamic, Property.NodeScope);
private final Logger tracerLog;
private final ConnectionProfile defaultConnectionProfile;
volatile String[] tracerLogInclude;
volatile String[] tracerLogExclude;
@ -131,6 +134,15 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
sendLocalRequest(requestId, action, request, options);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
@ -145,6 +157,13 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders) {
this(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders,
new ConnectionManager(settings, transport, threadPool));
}
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders, ConnectionManager connectionManager) {
super(settings);
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
@ -152,6 +171,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
this.transport = transport;
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
this.connectionManager = connectionManager;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
setTracerLogInclude(TRACE_LOG_INCLUDE_SETTING.get(settings));
setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings));
@ -162,6 +182,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
this.connectToRemoteCluster = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
remoteClusterService = new RemoteClusterService(settings, this);
responseHandlers = transport.getResponseHandlers();
defaultConnectionProfile = TcpTransport.buildDefaultConnectionProfile(settings);
if (clusterSettings != null) {
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude);
clusterSettings.addSettingsUpdateConsumer(TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude);
@ -232,6 +253,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override
protected void doStop() {
try {
connectionManager.close();
transport.stop();
} finally {
// in case the transport is not connected to our local node (thus cleaned on node disconnect)
@ -305,7 +327,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
* Returns <code>true</code> iff the given node is already connected.
*/
public boolean nodeConnected(DiscoveryNode node) {
return isLocalNode(node) || transport.nodeConnected(node);
return isLocalNode(node) || connectionManager.nodeConnected(node);
}
/**
@ -327,7 +349,9 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
if (isLocalNode(node)) {
return;
}
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
ConnectionProfile resolvedProfile = TcpTransport.resolveConnectionProfile(connectionProfile, defaultConnectionProfile);
connectionManager.connectToNode(node, resolvedProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
if (validateConnections && node.equals(remote) == false) {
@ -378,12 +402,11 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
* @param handshakeTimeout handshake timeout
* @param clusterNamePredicate cluster name validation predicate
* @return the handshake response
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public HandshakeResponse handshake(
final Transport.Connection connection,
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) {
final HandshakeResponse response;
final DiscoveryNode node = connection.getNode();
try {
@ -410,6 +433,10 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
return response;
}
public ConnectionManager getConnectionManager() {
return connectionManager;
}
static class HandshakeRequest extends TransportRequest {
public static final HandshakeRequest INSTANCE = new HandshakeRequest();
@ -462,15 +489,17 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
if (isLocalNode(node)) {
return;
}
transport.disconnectFromNode(node);
connectionManager.disconnectFromNode(node);
}
public void addConnectionListener(TransportConnectionListener listener) {
transport.addConnectionListener(listener);
connectionManager.addListener(listener);
}
public void removeConnectionListener(TransportConnectionListener listener) {
transport.removeConnectionListener(listener);
connectionManager.removeListener(listener);
}
public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
@ -533,7 +562,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
if (isLocalNode(node)) {
return localNodeConnection;
} else {
return transport.getConnection(node);
return connectionManager.getConnection(node);
}
}

View File

@ -63,7 +63,7 @@ public class GetIndexActionTests extends ESSingleNodeTestCase {
clusterService = getInstanceFromNode(ClusterService.class);
indicesService = getInstanceFromNode(IndicesService.class);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
@ -81,12 +81,10 @@ public class GetIndexActionTests extends ESSingleNodeTestCase {
public void testIncludeDefaults() {
GetIndexRequest defaultsRequest = new GetIndexRequest().indices(indexName).includeDefaults(true);
getIndexAction.execute(null, defaultsRequest, ActionListener.wrap(
defaultsResponse -> {
assertNotNull(
defaultsResponse -> assertNotNull(
"index.refresh_interval should be set as we are including defaults",
defaultsResponse.getSetting(indexName, "index.refresh_interval")
);
}, exception -> {
), exception -> {
throw new AssertionError(exception);
})
);
@ -95,12 +93,10 @@ public class GetIndexActionTests extends ESSingleNodeTestCase {
public void testDoNotIncludeDefaults() {
GetIndexRequest noDefaultsRequest = new GetIndexRequest().indices(indexName);
getIndexAction.execute(null, noDefaultsRequest, ActionListener.wrap(
noDefaultsResponse -> {
assertNull(
noDefaultsResponse -> assertNull(
"index.refresh_interval should be null as it was never set",
noDefaultsResponse.getSetting(indexName, "index.refresh_interval")
);
}, exception -> {
), exception -> {
throw new AssertionError(exception);
})
);

View File

@ -75,7 +75,7 @@ public class GetSettingsActionTests extends ESTestCase {
threadPool = new TestThreadPool("GetSettingsActionTests");
clusterService = createClusterService(threadPool);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -80,7 +80,7 @@ public class TransportBulkActionTests extends ESTestCase {
threadPool = new TestThreadPool("TransportBulkActionTookTests");
clusterService = createClusterService(threadPool);
CapturingTransport capturingTransport = new CapturingTransport();
transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -92,7 +92,7 @@ public class TransportBulkActionTookTests extends ESTestCase {
private TransportBulkAction createAction(boolean controlled, AtomicLong expected) {
CapturingTransport capturingTransport = new CapturingTransport();
TransportService transportService = new TransportService(clusterService.getSettings(), capturingTransport, threadPool,
TransportService transportService = capturingTransport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();

View File

@ -446,7 +446,17 @@ public class SearchAsyncActionTests extends ESTestCase {
}
@Override
public void close() throws IOException {
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
throw new UnsupportedOperationException();
}
}

View File

@ -191,7 +191,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
final TransportService transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -87,7 +87,7 @@ public class TransportMasterNodeActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -180,7 +180,7 @@ public class TransportNodesActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -163,7 +163,7 @@ public class TransportReplicationActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -255,7 +255,7 @@ public class TransportWriteActionTests extends ESTestCase {
public void testReplicaProxy() throws InterruptedException, ExecutionException {
CapturingTransport transport = new CapturingTransport();
TransportService transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -23,7 +23,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
@ -144,16 +143,15 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
super.setUp();
transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet()
);
transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();
action = new TestTransportInstanceSingleOperationAction(
Settings.EMPTY,
"indices:admin/test",
transportService,
new ActionFilters(new HashSet<ActionFilter>()),
new ActionFilters(new HashSet<>()),
new MyResolver(),
Request::new
);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.client.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
@ -26,13 +27,13 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.CloseableConnection;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
@ -43,9 +44,9 @@ import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
@ -59,7 +60,7 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
private final Random random;
private final ClusterName clusterName;
private volatile Map<String, RequestHandlerRegistry> requestHandlers = Collections.emptyMap();
final Object requestHandlerMutex = new Object();
private final Object requestHandlerMutex = new Object();
private final ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportConnectionListener listener;
@ -78,8 +79,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
protected abstract ClusterState getMockClusterState(DiscoveryNode node);
@Override
public Connection getConnection(DiscoveryNode node) {
return new Connection() {
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new CloseableConnection() {
@Override
public DiscoveryNode getNode() {
return node;
@ -87,19 +89,22 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
throws TransportException {
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
if (connectMode) {
if (TransportLivenessAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
transportResponseHandler.handleResponse(new LivenessResponse(ClusterName.CLUSTER_NAME_SETTING.
getDefault(Settings.EMPTY),
node));
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
transportResponseHandler.handleResponse(new LivenessResponse(clusterName, node));
} else if (ClusterStateAction.NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
ClusterState clusterState = getMockClusterState(node);
transportResponseHandler.handleResponse(new ClusterStateResponse(clusterName, clusterState, 0L));
} else if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
Version version = node.getVersion();
transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, version));
} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
}
@ -129,19 +134,9 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
}
}
}
@Override
public void close() throws IOException {
}
};
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return getConnection(node);
}
protected abstract Response newResponse();
public void endConnectMode() {
@ -175,23 +170,6 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
throw new UnsupportedOperationException();
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return false;
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -41,7 +41,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@ -51,7 +50,6 @@ import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.CustomMatcher;
import java.io.Closeable;
@ -83,7 +81,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
private static class TestIteration implements Closeable {
private final ThreadPool threadPool;
private final FailAndRetryMockTransport<TestResponse> transport;
private final TransportService transportService;
private final MockTransportService transportService;
private final TransportClientNodesService transportClientNodesService;
private final int listNodesCount;
private final int sniffNodesCount;
@ -143,7 +141,8 @@ public class TransportClientNodesServiceTests extends ESTestCase {
return ClusterState.builder(clusterName).nodes(TestIteration.this.nodeMap.get(node.getAddress())).build();
}
};
transportService = new TransportService(settings, transport, threadPool, new TransportInterceptor() {
transportService = new MockTransportService(settings, transport, threadPool, new TransportInterceptor() {
@Override
public AsyncSender interceptSender(AsyncSender sender) {
return new AsyncSender() {
@ -165,6 +164,11 @@ public class TransportClientNodesServiceTests extends ESTestCase {
assert addr == null : "boundAddress: " + addr;
return DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID());
}, null, Collections.emptySet());
transportService.addNodeConnectedBehavior((connectionManager, discoveryNode) -> false);
transportService.addGetConnectionBehavior((connectionManager, discoveryNode) -> {
// The FailAndRetryTransport does not use the connection profile
return transport.openConnection(discoveryNode, null);
});
transportService.start();
transportService.acceptIncomingRequests();
transportClientNodesService =
@ -356,25 +360,15 @@ public class TransportClientNodesServiceTests extends ESTestCase {
.build();
try (MockTransportService clientService = createNewService(clientSettings, Version.CURRENT, threadPool, null)) {
final List<MockConnection> establishedConnections = new CopyOnWriteArrayList<>();
final List<MockConnection> reusedConnections = new CopyOnWriteArrayList<>();
final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();
clientService.addDelegate(remoteService, new MockTransportService.DelegateTransport(clientService.original()) {
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
MockConnection connection = new MockConnection(super.openConnection(node, profile));
clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile) -> {
Transport.Connection connection = transport.openConnection(discoveryNode, profile);
establishedConnections.add(connection);
return connection;
}
@Override
public Connection getConnection(DiscoveryNode node) {
MockConnection connection = new MockConnection(super.getConnection(node));
reusedConnections.add(connection);
return connection;
}
});
clientService.start();
clientService.acceptIncomingRequests();
@ -382,27 +376,26 @@ public class TransportClientNodesServiceTests extends ESTestCase {
new TransportClientNodesService(clientSettings, clientService, threadPool, (a, b) -> {})) {
assertEquals(0, transportClientNodesService.connectedNodes().size());
assertEquals(0, establishedConnections.size());
assertEquals(0, reusedConnections.size());
transportClientNodesService.addTransportAddresses(remoteService.getLocalDiscoNode().getAddress());
assertEquals(1, transportClientNodesService.connectedNodes().size());
assertClosedConnections(establishedConnections, 1);
assertEquals(1, clientService.connectionManager().connectedNodeCount());
transportClientNodesService.doSample();
assertClosedConnections(establishedConnections, 2);
assertOpenConnections(reusedConnections, 1);
assertEquals(1, clientService.connectionManager().connectedNodeCount());
establishedConnections.clear();
handler.blockRequest();
Thread thread = new Thread(transportClientNodesService::doSample);
thread.start();
assertBusy(() -> assertEquals(3, establishedConnections.size()));
assertFalse("Temporary ping connection must be opened", establishedConnections.get(2).isClosed());
assertBusy(() -> assertTrue(establishedConnections.size() >= 1));
assertFalse("Temporary ping connection must be opened", establishedConnections.get(0).isClosed());
handler.releaseRequest();
thread.join();
assertClosedConnections(establishedConnections, 3);
assertTrue(establishedConnections.get(0).isClosed());
}
}
} finally {
@ -410,56 +403,6 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
}
private void assertClosedConnections(final List<MockConnection> connections, final int size) {
assertEquals("Expecting " + size + " closed connections but got " + connections.size(), size, connections.size());
connections.forEach(c -> assertConnection(c, true));
}
private void assertOpenConnections(final List<MockConnection> connections, final int size) {
assertEquals("Expecting " + size + " open connections but got " + connections.size(), size, connections.size());
connections.forEach(c -> assertConnection(c, false));
}
private static void assertConnection(final MockConnection connection, final boolean closed) {
assertEquals("Connection [" + connection + "] must be " + (closed ? "closed" : "open"), closed, connection.isClosed());
}
class MockConnection implements Transport.Connection {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Transport.Connection connection;
private MockConnection(Transport.Connection connection) {
this.connection = connection;
}
@Override
public DiscoveryNode getNode() {
return connection.getNode();
}
@Override
public Version getVersion() {
return connection.getVersion();
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
connection.close();
}
}
boolean isClosed() {
return closed.get();
}
}
class MockHandler implements TransportRequestHandler<ClusterStateRequest> {
private final AtomicBoolean block = new AtomicBoolean(false);
private final CountDownLatch release = new CountDownLatch(1);

View File

@ -46,12 +46,9 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@ -196,18 +193,15 @@ public class ClusterInfoServiceIT extends ESIntegTestCase {
final Set<String> blockedActions = newHashSet(NodesStatsAction.NAME, NodesStatsAction.NAME + "[n]", IndicesStatsAction.NAME, IndicesStatsAction.NAME + "[n]");
// drop all outgoing stats requests to force a timeout.
for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) {
mockTransportService.addDelegate(internalTestCluster.getInstance(TransportService.class, node.getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
mockTransportService.addSendBehavior(internalTestCluster.getInstance(TransportService.class, node.getName()),
(connection, requestId, action, request, options) -> {
if (blockedActions.contains(action)) {
if (timeout.get()) {
logger.info("dropping [{}] to [{}]", action, node);
return;
}
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
}

View File

@ -20,16 +20,16 @@
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -39,6 +39,7 @@ import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
@ -46,8 +47,6 @@ import org.elasticsearch.transport.TransportStats;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -56,6 +55,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import static org.hamcrest.Matchers.equalTo;
@ -121,7 +122,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
for (int i = 0; i < 3; i++) {
// simulate disconnects
for (DiscoveryNode node : randomSubsetOf(nodes)) {
transport.disconnectFromNode(node);
transportService.disconnectFromNode(node);
}
service.new ConnectionChecker().run();
}
@ -134,18 +135,12 @@ public class NodeConnectionsServiceTests extends ESTestCase {
private void assertConnectedExactlyToNodes(ClusterState state) {
assertConnected(state.nodes());
assertThat(transport.connectedNodes.size(), equalTo(state.nodes().getSize()));
assertThat(transportService.getConnectionManager().connectedNodeCount(), equalTo(state.nodes().getSize()));
}
private void assertConnected(Iterable<DiscoveryNode> nodes) {
for (DiscoveryNode node : nodes) {
assertTrue("not connected to " + node, transport.connectedNodes.contains(node));
}
}
private void assertNotConnected(Iterable<DiscoveryNode> nodes) {
for (DiscoveryNode node : nodes) {
assertFalse("still connected to " + node, transport.connectedNodes.contains(node));
assertTrue("not connected to " + node, transportService.nodeConnected(node));
}
}
@ -155,7 +150,8 @@ public class NodeConnectionsServiceTests extends ESTestCase {
super.setUp();
this.threadPool = new TestThreadPool(getClass().getName());
this.transport = new MockTransport();
transportService = new TransportService(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR,
transportService = new NoHandshakeTransportService(Settings.EMPTY, transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null,
Collections.emptySet());
transportService.start();
@ -171,11 +167,29 @@ public class NodeConnectionsServiceTests extends ESTestCase {
super.tearDown();
}
final class MockTransport implements Transport {
Set<DiscoveryNode> connectedNodes = ConcurrentCollections.newConcurrentSet();
volatile boolean randomConnectionExceptions = false;
private final class NoHandshakeTransportService extends TransportService {
private NoHandshakeTransportService(Settings settings,
Transport transport,
ThreadPool threadPool,
TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
ClusterSettings clusterSettings,
Set<String> taskHeaders) {
super(settings, transport, threadPool, transportInterceptor, localNodeFactory, clusterSettings, taskHeaders);
}
@Override
public HandshakeResponse handshake(Transport.Connection connection, long timeout, Predicate<ClusterName> clusterNamePredicate) {
return new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT);
}
}
private final class MockTransport implements Transport {
private ResponseHandlers responseHandlers = new ResponseHandlers();
private TransportConnectionListener listener = new TransportConnectionListener() {};
private volatile boolean randomConnectionExceptions = false;
private TransportConnectionListener listener = new TransportConnectionListener() {
};
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
@ -207,37 +221,19 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
public TransportAddress[] addressesFromString(String address, int perAddressLimit) {
return new TransportAddress[0];
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return connectedNodes.contains(node);
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
public Connection openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
if (connectionProfile == null) {
if (connectedNodes.contains(node) == false && randomConnectionExceptions && randomBoolean()) {
if (randomConnectionExceptions && randomBoolean()) {
throw new ConnectTransportException(node, "simulated");
}
connectedNodes.add(node);
listener.onNodeConnected(node);
}
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
connectedNodes.remove(node);
listener.onNodeDisconnected(node);
}
@Override
public Connection getConnection(DiscoveryNode node) {
return new Connection() {
Connection connection = new Connection() {
@Override
public DiscoveryNode getNode() {
return node;
@ -250,15 +246,20 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public void close() {
public void addCloseListener(ActionListener<Void> listener) {
}
};
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
Connection connection = getConnection(node);
public void close() {
}
@Override
public boolean isClosed() {
return false;
}
};
listener.onConnectionOpened(connection);
return connection;
}
@ -282,13 +283,16 @@ public class NodeConnectionsServiceTests extends ESTestCase {
}
@Override
public void start() {}
public void start() {
}
@Override
public void stop() {}
public void stop() {
}
@Override
public void close() {}
public void close() {
}
@Override
public TransportStats getStats() {

View File

@ -124,7 +124,7 @@ public class ShardStateActionTests extends ESTestCase {
super.setUp();
this.transport = new CapturingTransport();
clusterService = createClusterService(THREAD_POOL);
transportService = new TransportService(clusterService.getSettings(), transport, THREAD_POOL,
transportService = transport.createCapturingTransportService(clusterService.getSettings(), THREAD_POOL,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -94,7 +94,8 @@ public class ClusterStateHealthTests extends ESTestCase {
public void setUp() throws Exception {
super.setUp();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), new CapturingTransport(), threadPool,
CapturingTransport transport = new CapturingTransport();
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -37,13 +37,9 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.disruption.SlowClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -175,24 +171,15 @@ public class DiscoveryDisruptionIT extends AbstractDisruptionTestCase {
logger.info("allowing requests from non master [{}] to master [{}], waiting for two join request", nonMasterNode, masterNode);
final CountDownLatch countDownLatch = new CountDownLatch(2);
nonMasterTransportService.addDelegate(masterTranspotService, new MockTransportService.DelegateTransport(nonMasterTransportService
.original()) {
@Override
protected void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
nonMasterTransportService.addSendBehavior(masterTransportService, (connection, requestId, action, request, options) -> {
if (action.equals(MembershipAction.DISCOVERY_JOIN_ACTION_NAME)) {
countDownLatch.countDown();
}
super.sendRequest(connection, requestId, action, request, options);
}
@Override
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return super.openConnection(node, profile);
}
connection.sendRequest(requestId, action, request, options);
});
nonMasterTransportService.addConnectBehavior(masterTransportService, Transport::openConnection);
countDownLatch.await();
logger.info("waiting for cluster to reform");

View File

@ -21,7 +21,6 @@ package org.elasticsearch.discovery.zen;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.Constants;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -29,7 +28,6 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkAddress;
@ -42,14 +40,13 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
@ -152,14 +149,7 @@ public class UnicastZenPingTests extends ESTestCase {
new NoneCircuitBreakerService(),
new NamedWriteableRegistry(Collections.emptyList()),
networkService,
v) {
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
throw new AssertionError("zen pings should never connect to node (got [" + node + "])");
}
};
v);
NetworkHandle handleA = startServices(settings, threadPool, "UZP_A", Version.CURRENT, supplier);
closeables.push(handleA.transportService);

View File

@ -17,12 +17,12 @@
package org.elasticsearch.index.seqno;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
@ -33,7 +33,6 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import java.util.Collections;
@ -47,7 +46,7 @@ import static org.mockito.Mockito.verify;
public class GlobalCheckpointSyncActionTests extends ESTestCase {
private ThreadPool threadPool;
private Transport transport;
private CapturingTransport transport;
private ClusterService clusterService;
private TransportService transportService;
private ShardStateAction shardStateAction;
@ -57,7 +56,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
threadPool = new TestThreadPool(getClass().getName());
transport = new CapturingTransport();
clusterService = createClusterService(threadPool);
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
transportService = transport.createCapturingTransportService(clusterService.getSettings(), threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundAddress -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -34,11 +34,8 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -85,21 +82,12 @@ public class GlobalCheckpointSyncIT extends ESIntegTestCase {
(MockTransportService) internalCluster().getInstance(TransportService.class, node.getName());
final MockTransportService receiverTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, other.getName());
senderTransportService.addDelegate(receiverTransportService,
new MockTransportService.DelegateTransport(senderTransportService.original()) {
@Override
protected void sendRequest(
final Connection connection,
final long requestId,
final String action,
final TransportRequest request,
final TransportRequestOptions options) throws IOException {
senderTransportService.addSendBehavior(receiverTransportService,
(connection, requestId, action, request, options) -> {
if ("indices:admin/seq_no/global_checkpoint_sync[r]".equals(action)) {
throw new IllegalStateException("blocking indices:admin/seq_no/global_checkpoint_sync[r]");
} else {
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
}
});
}

View File

@ -70,8 +70,6 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -338,10 +336,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
final CountDownLatch hasCorrupted = new CountDownLatch(1);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().getName()));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> {
if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes;
@ -349,8 +344,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
array[i] = (byte) ~array[i]; // flip one byte in the content
hasCorrupted.countDown();
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
}
@ -410,10 +404,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
final boolean truncate = randomBoolean();
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().getName()));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
if (truncate && req.length() > 1) {
@ -427,8 +418,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
array[i] = (byte) ~array[i]; // flip one byte in the content
}
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
}

View File

@ -38,8 +38,6 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -95,17 +93,13 @@ public class ExceptionRetryIT extends ESIntegTestCase {
for (NodeStats dataNode : nodeStats.getNodes()) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class,
dataNode.getNode().getName()));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
super.sendRequest(connection, requestId, action, request, options);
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
connection.sendRequest(requestId, action, request, options);
if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) {
logger.debug("Throw ConnectTransportException");
throw new ConnectTransportException(connection.getNode(), action);
}
}
});
}

View File

@ -61,6 +61,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
@ -598,8 +599,8 @@ public class IndexRecoveryIT extends ESIntegTestCase {
TransportService blueTransportService = internalCluster().getInstance(TransportService.class, blueNodeName);
final CountDownLatch requestBlocked = new CountDownLatch(1);
blueMockTransportService.addDelegate(redTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, blueMockTransportService.original(), requestBlocked));
redMockTransportService.addDelegate(blueTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, redMockTransportService.original(), requestBlocked));
blueMockTransportService.addSendBehavior(redTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestBlocked));
redMockTransportService.addSendBehavior(blueTransportService, new RecoveryActionBlocker(dropRequests, recoveryActionToBlock, requestBlocked));
logger.info("--> starting recovery from blue to red");
client().admin().indices().prepareUpdateSettings(indexName).setSettings(
@ -620,20 +621,19 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
private class RecoveryActionBlocker extends MockTransportService.DelegateTransport {
private class RecoveryActionBlocker implements StubbableTransport.SendRequestBehavior {
private final boolean dropRequests;
private final String recoveryActionToBlock;
private final CountDownLatch requestBlocked;
RecoveryActionBlocker(boolean dropRequests, String recoveryActionToBlock, Transport delegate, CountDownLatch requestBlocked) {
super(delegate);
RecoveryActionBlocker(boolean dropRequests, String recoveryActionToBlock, CountDownLatch requestBlocked) {
this.dropRequests = dropRequests;
this.recoveryActionToBlock = recoveryActionToBlock;
this.requestBlocked = requestBlocked;
}
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
if (recoveryActionToBlock.equals(action) || requestBlocked.getCount() == 0) {
logger.info("--> preventing {} request", action);
@ -643,7 +643,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: prevented " + action + " request");
}
super.sendRequest(connection, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
}
}
@ -686,11 +686,11 @@ public class IndexRecoveryIT extends ESIntegTestCase {
MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
redMockTransportService.addDelegate(blueMockTransportService, new MockTransportService.DelegateTransport(redMockTransportService.original()) {
redMockTransportService.addSendBehavior(blueMockTransportService, new StubbableTransport.SendRequestBehavior() {
private final AtomicInteger count = new AtomicInteger();
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
logger.info("--> sending request {} on {}", action, connection.getNode());
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action) && count.incrementAndGet() == 1) {
@ -701,7 +701,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
super.sendRequest(connection, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
try {
Thread.sleep(disconnectAfterDelay.millis());
} catch (InterruptedException e) {
@ -709,35 +709,27 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulation disconnect after successfully sending " + action + " request");
} else {
super.sendRequest(connection, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
}
}
});
final AtomicBoolean finalized = new AtomicBoolean();
blueMockTransportService.addDelegate(redMockTransportService, new MockTransportService.DelegateTransport(blueMockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
blueMockTransportService.addSendBehavior(redMockTransportService, (connection, requestId, action, request, options) -> {
logger.info("--> sending request {} on {}", action, connection.getNode());
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE)) {
finalized.set(true);
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
for (MockTransportService mockTransportService : Arrays.asList(redMockTransportService, blueMockTransportService)) {
mockTransportService.addDelegate(masterTransportService, new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
mockTransportService.addSendBehavior(masterTransportService, (connection, requestId, action, request, options) -> {
logger.info("--> sending request {} on {}", action, connection.getNode());
if ((primaryRelocation && finalized.get()) == false) {
assertNotEquals(action, ShardStateAction.SHARD_FAILED_ACTION_NAME);
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
}

View File

@ -43,6 +43,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.HashMap;
import java.util.List;
@ -73,6 +74,7 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
}
}
@TestLogging("_root:TRACE")
public void testRetentionPolicyChangeDuringRecovery() throws Exception {
try (ReplicationGroup shards = createGroup(0)) {
shards.startPrimary();
@ -99,7 +101,10 @@ public class RecoveryTests extends ESIndexLevelReplicationTestCase {
releaseRecovery.countDown();
future.get();
// rolling/flushing is async
assertBusy(() -> assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(0)));
assertBusy(() -> {
assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(19L));
assertThat(replica.estimateTranslogOperationsFromMinSeq(0), equalTo(0));
});
}
}

View File

@ -55,8 +55,6 @@ import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -234,16 +232,13 @@ public class IndicesStoreIntegrationIT extends ESIntegTestCase {
MockTransportService transportServiceNode_1 = (MockTransportService) internalCluster().getInstance(TransportService.class, node_1);
TransportService transportServiceNode_2 = internalCluster().getInstance(TransportService.class, node_2);
final CountDownLatch shardActiveRequestSent = new CountDownLatch(1);
transportServiceNode_1.addDelegate(transportServiceNode_2, new MockTransportService.DelegateTransport(transportServiceNode_1.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
transportServiceNode_1.addSendBehavior(transportServiceNode_2, (connection, requestId, action, request, options) -> {
if (action.equals("internal:index/shard/exists") && shardActiveRequestSent.getCount() > 0) {
shardActiveRequestSent.countDown();
logger.info("prevent shard active request from being sent");
throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated");
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
logger.info("--> move shard from {} to {}, and wait for relocation to finish", node_1, node_2);

View File

@ -58,6 +58,7 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.test.transport.StubbableTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -372,7 +373,7 @@ public class RelocationIT extends ESIntegTestCase {
MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, p_node);
for (DiscoveryNode node : clusterService.state().nodes()) {
if (!node.equals(clusterService.localNode())) {
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, node.getName()), new RecoveryCorruption(mockTransportService.original(), corruptionCount));
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()), new RecoveryCorruption(corruptionCount));
}
}
@ -485,17 +486,16 @@ public class RelocationIT extends ESIntegTestCase {
}
class RecoveryCorruption extends MockTransportService.DelegateTransport {
class RecoveryCorruption implements StubbableTransport.SendRequestBehavior {
private final CountDownLatch corruptionCount;
RecoveryCorruption(Transport transport, CountDownLatch corruptionCount) {
super(transport);
RecoveryCorruption(CountDownLatch corruptionCount) {
this.corruptionCount = corruptionCount;
}
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request;
if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) {
@ -506,9 +506,9 @@ public class RelocationIT extends ESIntegTestCase {
array[0] = (byte) ~array[0]; // flip one byte in the content
corruptionCount.countDown();
}
super.sendRequest(connection, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
} else {
super.sendRequest(connection, requestId, action, request, options);
connection.sendRequest(requestId, action, request, options);
}
}
}

View File

@ -35,11 +35,8 @@ import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -116,11 +113,8 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
final AtomicBoolean truncate = new AtomicBoolean(true);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().getName()));
mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
logger.debug("file chunk [{}] lastChunk: {}", req, req.lastChunk());
@ -129,8 +123,7 @@ public class TruncatedRecoveryIT extends ESIntegTestCase {
throw new RuntimeException("Caused some truncated files for fun and profit");
}
}
super.sendRequest(connection, requestId, action, request, options);
}
connection.sendRequest(requestId, action, request, options);
});
}

View File

@ -117,6 +117,8 @@ public class BucketScriptIT extends ESIntegTestCase {
return value0 + value1 + value2;
});
scripts.put("return null", vars -> null);
return scripts;
}
}
@ -478,6 +480,33 @@ public class BucketScriptIT extends ESIntegTestCase {
}
}
public void testInlineScriptReturnNull() {
SearchResponse response = client()
.prepareSearch("idx")
.addAggregation(
histogram("histo")
.field(FIELD_1_NAME).interval(interval)
.subAggregation(
bucketScript(
"nullField",
new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, "return null", Collections.emptyMap())
)
)
).execute().actionGet();
assertSearchResponse(response);
Histogram histo = response.getAggregations().get("histo");
assertThat(histo, notNullValue());
assertThat(histo.getName(), equalTo("histo"));
List<? extends Histogram.Bucket> buckets = histo.getBuckets();
for (int i = 0; i < buckets.size(); ++i) {
Histogram.Bucket bucket = buckets.get(i);
assertNull(bucket.getAggregations().get("nullField"));
}
}
public void testStoredScript() {
assertAcked(client().admin().cluster().preparePutStoredScript()
.setId("my_script")

View File

@ -0,0 +1,164 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ConnectionManagerTests extends ESTestCase {
private ConnectionManager connectionManager;
private ThreadPool threadPool;
private Transport transport;
private ConnectionProfile connectionProfile;
@Before
public void createConnectionManager() {
Settings settings = Settings.builder()
.put("node.name", ConnectionManagerTests.class.getSimpleName())
.build();
threadPool = new ThreadPool(settings);
transport = mock(Transport.class);
connectionManager = new ConnectionManager(settings, transport, threadPool);
TimeValue oneSecond = new TimeValue(1000);
connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond);
}
@After
public void stopThreadPool() {
threadPool.shutdown();
}
public void testConnectAndDisconnect() {
AtomicInteger nodeConnectedCount = new AtomicInteger();
AtomicInteger nodeDisconnectedCount = new AtomicInteger();
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
nodeConnectedCount.incrementAndGet();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
nodeDisconnectedCount.incrementAndGet();
}
});
DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
assertFalse(connectionManager.nodeConnected(node));
AtomicReference<Transport.Connection> connectionRef = new AtomicReference<>();
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> validator = (c, p) -> connectionRef.set(c);
connectionManager.connectToNode(node, connectionProfile, validator);
assertFalse(connection.isClosed());
assertTrue(connectionManager.nodeConnected(node));
assertSame(connection, connectionManager.getConnection(node));
assertEquals(1, connectionManager.connectedNodeCount());
assertEquals(1, nodeConnectedCount.get());
assertEquals(0, nodeDisconnectedCount.get());
if (randomBoolean()) {
connectionManager.disconnectFromNode(node);
} else {
connection.close();
}
assertTrue(connection.isClosed());
assertEquals(0, connectionManager.connectedNodeCount());
assertEquals(1, nodeConnectedCount.get());
assertEquals(1, nodeDisconnectedCount.get());
}
public void testConnectFails() {
AtomicInteger nodeConnectedCount = new AtomicInteger();
AtomicInteger nodeDisconnectedCount = new AtomicInteger();
connectionManager.addListener(new TransportConnectionListener() {
@Override
public void onNodeConnected(DiscoveryNode node) {
nodeConnectedCount.incrementAndGet();
}
@Override
public void onNodeDisconnected(DiscoveryNode node) {
nodeDisconnectedCount.incrementAndGet();
}
});
DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT);
Transport.Connection connection = new TestConnect(node);
when(transport.openConnection(node, connectionProfile)).thenReturn(connection);
assertFalse(connectionManager.nodeConnected(node));
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> validator = (c, p) -> {
throw new ConnectTransportException(node, "");
};
expectThrows(ConnectTransportException.class, () -> connectionManager.connectToNode(node, connectionProfile, validator));
assertTrue(connection.isClosed());
assertFalse(connectionManager.nodeConnected(node));
expectThrows(NodeNotConnectedException.class, () -> connectionManager.getConnection(node));
assertEquals(0, connectionManager.connectedNodeCount());
assertEquals(0, nodeConnectedCount.get());
assertEquals(0, nodeDisconnectedCount.get());
}
private static class TestConnect extends CloseableConnection {
private final DiscoveryNode node;
private TestConnect(DiscoveryNode node) {
this.node = node;
}
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
}
}
}

View File

@ -115,7 +115,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.get(s);
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel, task) -> {
if ("index_not_found".equals(request.preference())) {
channel.sendResponse(new IndexNotFoundException("index"));
@ -436,19 +436,28 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
@Override
public void close() throws IOException {
public void addCloseListener(ActionListener<Void> listener) {
// no-op
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
// no-op
}
};
service.addDelegate(seedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
@Override
public Connection getConnection(DiscoveryNode node) {
if (node == seedNode) {
service.addGetConnectionBehavior(seedNode.getAddress(), (connectionManager, discoveryNode) -> {
if (discoveryNode == seedNode) {
return seedConnection;
}
return super.getConnection(node);
}
return connectionManager.getConnection(discoveryNode);
});
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
@ -511,7 +520,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
closeRemote.countDown();
listenerCalled.await();
assertNotNull(exceptionReference.get());
expectThrows(CancellableThreads.ExecutionCancelledException.class, () -> {throw exceptionReference.get();});
expectThrows(CancellableThreads.ExecutionCancelledException.class, () -> {
throw exceptionReference.get();
});
}
}
@ -664,7 +675,9 @@ public class RemoteClusterConnectionTests extends ESTestCase {
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
AtomicReference<Exception> failReference = new AtomicReference<>();
connection.fetchSearchShards(searchShardsRequest,
new LatchedActionListener<>(ActionListener.wrap(reference::set, failReference::set), responseLatch));
new LatchedActionListener<>(ActionListener.wrap((s) -> {
reference.set(s);
}, failReference::set), responseLatch));
assertTrue(responseLatch.await(1, TimeUnit.SECONDS));
assertNotNull(failReference.get());
assertNull(reference.get());
@ -749,7 +762,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
ActionListener<Void> listener = ActionListener.wrap(
x -> {
assertTrue(executed.compareAndSet(false, true));
latch.countDown();},
latch.countDown();
},
x -> {
/*
* This can occur on a thread submitted to the thread pool while we are closing the
@ -1007,7 +1021,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
public void testRenderConnectionInfoXContent() throws IOException {
RemoteConnectionInfo stats = new RemoteConnectionInfo("test_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1)),
4, 3, TimeValue.timeValueMinutes(30), true);
stats = assertSerialization(stats);
XContentBuilder builder = XContentFactory.jsonBuilder();
@ -1019,7 +1033,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
"\"skip_unavailable\":true}}", Strings.toString(builder));
stats = new RemoteConnectionInfo("some_other_cluster",
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)),
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS, 1), new TransportAddress(TransportAddress.META_ADDRESS, 2)),
2, 0, TimeValue.timeValueSeconds(30), false);
stats = assertSerialization(stats);
builder = XContentFactory.jsonBuilder();
@ -1129,7 +1143,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
try {
final int numDiscoverableNodes = randomIntBetween(5, 20);
List<DiscoveryNode> discoverableNodes = new ArrayList<>(numDiscoverableNodes);
for (int i = 0; i < numDiscoverableNodes; i++ ) {
for (int i = 0; i < numDiscoverableNodes; i++) {
MockTransportService transportService = startTransport("discoverable_node" + i, knownNodes, Version.CURRENT);
discoverableNodes.add(transportService.getLocalDiscoNode());
discoverableTransports.add(transportService);
@ -1218,7 +1232,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
settings);
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
MockTransportService otherClusterDiscoverable = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
@ -1283,24 +1297,30 @@ public class RemoteClusterConnectionTests extends ESTestCase {
// no-op
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
// no-op
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
// no-op
}
};
service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
@Override
public Connection getConnection(DiscoveryNode node) {
if (node == connectedNode) {
service.addNodeConnectedBehavior(connectedNode.getAddress(), (connectionManager, discoveryNode)
-> discoveryNode.equals(connectedNode));
service.addGetConnectionBehavior(connectedNode.getAddress(), (connectionManager, discoveryNode) -> {
if (discoveryNode == connectedNode) {
return seedConnection;
}
return super.getConnection(node);
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(connectedNode);
}
return connectionManager.getConnection(discoveryNode);
});
service.start();
service.acceptIncomingRequests();

View File

@ -198,7 +198,7 @@ public class TcpTransportTests extends ESTestCase {
}
@Override
public NodeChannels getConnection(DiscoveryNode node) {
public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) {
@ -209,7 +209,7 @@ public class TcpTransportTests extends ESTestCase {
};
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT);
Transport.Connection connection = transport.getConnection(node);
Transport.Connection connection = transport.openConnection(node, null);
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
BytesReference reference = messageCaptor.get();

View File

@ -111,8 +111,13 @@ public class MockScriptEngine implements ScriptEngine {
} else if (context.instanceClazz.equals(BucketAggregationScript.class)) {
BucketAggregationScript.Factory factory = parameters -> new BucketAggregationScript(parameters) {
@Override
public double execute() {
return ((Number) script.apply(getParams())).doubleValue();
public Double execute() {
Object ret = script.apply(getParams());
if (ret == null) {
return null;
} else {
return ((Number) ret).doubleValue();
}
}
};
return context.factoryClazz.cast(factory);

View File

@ -20,18 +20,22 @@
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.RequestHandlerRegistry;
@ -39,13 +43,14 @@ import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
@ -53,9 +58,11 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import static org.apache.lucene.util.LuceneTestCase.rarely;
@ -84,6 +91,45 @@ public class CapturingTransport implements Transport {
private ConcurrentMap<Long, Tuple<DiscoveryNode, String>> requests = new ConcurrentHashMap<>();
private BlockingQueue<CapturedRequest> capturedRequests = ConcurrentCollections.newBlockingQueue();
public TransportService createCapturingTransportService(Settings settings, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
StubbableConnectionManager connectionManager = new StubbableConnectionManager(new ConnectionManager(settings, this, threadPool),
settings, this, threadPool);
connectionManager.setDefaultNodeConnectedBehavior((cm, discoveryNode) -> true);
connectionManager.setDefaultConnectBehavior((cm, discoveryNode) -> new Connection() {
@Override
public DiscoveryNode getNode() {
return discoveryNode;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
requests.put(requestId, Tuple.tuple(discoveryNode, action));
capturedRequests.add(new CapturedRequest(discoveryNode, requestId, action, request));
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
});
return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
connectionManager);
}
/** returns all requests captured so far. Doesn't clear the captured request list. See {@link #clear()} */
public CapturedRequest[] capturedRequests() {
return capturedRequests.toArray(new CapturedRequest[0]);
@ -195,7 +241,7 @@ public class CapturingTransport implements Transport {
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
return new Connection() {
@Override
public DiscoveryNode getNode() {
@ -204,13 +250,23 @@ public class CapturingTransport implements Transport {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
throws TransportException {
requests.put(requestId, Tuple.tuple(node, action));
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
}
@Override
public void close() throws IOException {
public void addCloseListener(ActionListener<Void> listener) {
}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {
}
};
@ -236,23 +292,6 @@ public class CapturingTransport implements Transport {
return new TransportAddress[0];
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return true;
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
}
@Override
public Lifecycle.State lifecycleState() {
return null;
@ -282,14 +321,6 @@ public class CapturingTransport implements Transport {
return Collections.emptyList();
}
public Connection getConnection(DiscoveryNode node) {
try {
return openConnection(node, null);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
synchronized (requestHandlerMutex) {

View File

@ -20,15 +20,12 @@
package org.elasticsearch.test.transport;
import com.carrotsearch.randomizedtesting.SysGlobals;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
@ -40,7 +37,6 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.Plugin;
@ -48,21 +44,18 @@ import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.tasks.MockTaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.MockTcpTransport;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -71,18 +64,18 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A mock transport service that allows to simulate different network topology failures.
* A mock delegate service that allows to simulate different network topology failures.
* Internally it maps TransportAddress objects to rules that inject failures.
* Adding rules for a node is done by adding rules for all bound addresses of a node
* (and the publish address, if different).
* Matching requests to rules is based on the transport address associated with the
* Matching requests to rules is based on the delegate address associated with the
* discovery node of the request, namely by DiscoveryNode.getAddress().
* This address is usually the publish address of the node but can also be a different one
* (for example, @see org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing, which constructs
@ -147,9 +140,15 @@ public final class MockTransportService extends TransportService {
public MockTransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
super(settings, new LookupTestTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings,
taskHeaders);
this.original = transport;
this(settings, new StubbableTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders);
}
private MockTransportService(Settings settings, StubbableTransport transport, ThreadPool threadPool, TransportInterceptor interceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory,
@Nullable ClusterSettings clusterSettings, Set<String> taskHeaders) {
super(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders,
new StubbableConnectionManager(new ConnectionManager(settings, transport, threadPool), settings, transport, threadPool));
this.original = transport.getDelegate();
}
public static TransportAddress[] extractTransportAddresses(TransportService transportService) {
@ -173,11 +172,12 @@ public final class MockTransportService extends TransportService {
* Clears all the registered rules.
*/
public void clearAllRules() {
transport().transports.clear();
transport().clearBehaviors();
connectionManager().clearBehaviors();
}
/**
* Clears the rule associated with the provided transport service.
* Clears the rule associated with the provided delegate service.
*/
public void clearRule(TransportService transportService) {
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
@ -186,20 +186,11 @@ public final class MockTransportService extends TransportService {
}
/**
* Clears the rule associated with the provided transport address.
* Clears the rule associated with the provided delegate address.
*/
public void clearRule(TransportAddress transportAddress) {
Transport transport = transport().transports.remove(transportAddress);
if (transport instanceof ClearableTransport) {
((ClearableTransport) transport).clearRule();
}
}
/**
* Returns the original Transport service wrapped by this mock transport service.
*/
public Transport original() {
return original;
transport().clearBehavior(transportAddress);
connectionManager().clearBehavior(transportAddress);
}
/**
@ -217,30 +208,14 @@ public final class MockTransportService extends TransportService {
* is added to fail as well.
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress) {
addDelegate(transportAddress, new DelegateTransport(original) {
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile) -> {
throw new ConnectTransportException(discoveryNode, "DISCONNECT: simulated");
});
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
if (original.nodeConnected(node) == false) {
// connecting to an already connected node is a no-op
throw new ConnectTransportException(node, "DISCONNECT: simulated");
}
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
throw new ConnectTransportException(node, "DISCONNECT: simulated");
}
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
connection.close();
// send the request, which will blow up
connection.sendRequest(requestId, action, request, options);
}
});
}
@ -271,18 +246,12 @@ public final class MockTransportService extends TransportService {
* Adds a rule that will cause matching operations to throw ConnectTransportExceptions
*/
public void addFailToSendNoConnectRule(TransportAddress transportAddress, final Set<String> blockedActions) {
addDelegate(transportAddress, new DelegateTransport(original) {
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
if (blockedActions.contains(action)) {
logger.info("--> preventing {} request", action);
connection.close();
}
connection.sendRequest(requestId, action, request, options);
}
});
}
@ -301,28 +270,12 @@ public final class MockTransportService extends TransportService {
* and failing to connect once the rule was added.
*/
public void addUnresponsiveRule(TransportAddress transportAddress) {
addDelegate(transportAddress, new DelegateTransport(original) {
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile) -> {
throw new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated");
});
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
if (original.nodeConnected(node) == false) {
// connecting to an already connected node is a no-op
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
// don't send anything, the receiving node is unresponsive
}
});
}
@ -347,70 +300,38 @@ public final class MockTransportService extends TransportService {
public void addUnresponsiveRule(TransportAddress transportAddress, final TimeValue duration) {
final long startTime = System.currentTimeMillis();
addDelegate(transportAddress, new ClearableTransport(original) {
Supplier<TimeValue> delaySupplier = () -> new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
transport().addConnectBehavior(transportAddress, (transport, discoveryNode, profile) -> {
TimeValue delay = delaySupplier.get();
if (delay.millis() <= 0) {
return original.openConnection(discoveryNode, profile);
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
return original.openConnection(discoveryNode, profile);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated");
}
} catch (InterruptedException e) {
throw new ConnectTransportException(discoveryNode, "UNRESPONSIVE: simulated");
}
});
transport().addSendBehavior(transportAddress, new StubbableTransport.SendRequestBehavior() {
private final Queue<Runnable> requestsToSendWhenCleared = new LinkedBlockingDeque<>();
private boolean cleared = false;
TimeValue getDelay() {
return new TimeValue(duration.millis() - (System.currentTimeMillis() - startTime));
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
if (original.nodeConnected(node)) {
// connecting to an already connected node is a no-op
return;
}
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
original.connectToNode(node, connectionProfile, connectionValidator);
return;
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
original.connectToNode(node, connectionProfile, connectionValidator);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
} catch (InterruptedException e) {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
TimeValue delay = getDelay();
if (delay.millis() <= 0) {
return original.openConnection(node, profile);
}
// TODO: Replace with proper setting
TimeValue connectingTimeout = TcpTransport.TCP_CONNECT_TIMEOUT.getDefault(Settings.EMPTY);
try {
if (delay.millis() < connectingTimeout.millis()) {
Thread.sleep(delay.millis());
return original.openConnection(node, profile);
} else {
Thread.sleep(connectingTimeout.millis());
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
} catch (InterruptedException e) {
throw new ConnectTransportException(node, "UNRESPONSIVE: simulated");
}
}
@Override
protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
public void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
// delayed sending - even if larger then the request timeout to simulated a potential late response from target node
TimeValue delay = getDelay();
TimeValue delay = delaySupplier.get();
if (delay.millis() <= 0) {
connection.sendRequest(requestId, action, request, options);
return;
@ -450,7 +371,7 @@ public final class MockTransportService extends TransportService {
}
@Override
public void clearRule() {
public void clearCallback() {
synchronized (this) {
assert cleared == false;
cleared = true;
@ -461,234 +382,128 @@ public final class MockTransportService extends TransportService {
}
/**
* Adds a new delegate transport that is used for communication with the given transport service.
* Adds a new send behavior that is used for communication with the given delegate service.
*
* @return {@code true} iff no other delegate was registered for any of the addresses bound by transport service.
* @return {@code true} if no other send behavior was registered for any of the addresses bound by delegate service.
*/
public boolean addDelegate(TransportService transportService, DelegateTransport transport) {
public boolean addSendBehavior(TransportService transportService, StubbableTransport.SendRequestBehavior sendBehavior) {
boolean noRegistered = true;
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
noRegistered &= addDelegate(transportAddress, transport);
noRegistered &= addSendBehavior(transportAddress, sendBehavior);
}
return noRegistered;
}
/**
* Adds a new delegate transport that is used for communication with the given transport address.
* Adds a new send behavior that is used for communication with the given delegate address.
*
* @return {@code true} iff no other delegate was registered for this address before.
* @return {@code true} if no other send behavior was registered for this address before.
*/
public boolean addDelegate(TransportAddress transportAddress, DelegateTransport transport) {
return transport().transports.put(transportAddress, transport) == null;
}
private LookupTestTransport transport() {
return (LookupTestTransport) transport;
public boolean addSendBehavior(TransportAddress transportAddress, StubbableTransport.SendRequestBehavior sendBehavior) {
return transport().addSendBehavior(transportAddress, sendBehavior);
}
/**
* A lookup transport that has a list of potential Transport implementations to delegate to for node operations,
* if none is registered, then the default one is used.
* Adds a send behavior that is the default send behavior.
*
* @return {@code true} if no default send behavior was registered
*/
private static class LookupTestTransport extends DelegateTransport {
final ConcurrentMap<TransportAddress, Transport> transports = ConcurrentCollections.newConcurrentMap();
LookupTestTransport(Transport transport) {
super(transport);
public boolean addSendBehavior(StubbableTransport.SendRequestBehavior behavior) {
return transport().setDefaultSendBehavior(behavior);
}
private Transport getTransport(DiscoveryNode node) {
Transport transport = transports.get(node.getAddress());
if (transport != null) {
return transport;
}
return this.transport;
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return getTransport(node).nodeConnected(node);
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
getTransport(node).connectToNode(node, connectionProfile, connectionValidator);
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
getTransport(node).disconnectFromNode(node);
}
@Override
public Connection getConnection(DiscoveryNode node) {
return getTransport(node).getConnection(node);
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return getTransport(node).openConnection(node, profile);
/**
* Adds a new connect behavior that is used for creating connections with the given delegate service.
*
* @return {@code true} if no other send behavior was registered for any of the addresses bound by delegate service.
*/
public boolean addConnectBehavior(TransportService transportService, StubbableTransport.OpenConnectionBehavior connectBehavior) {
boolean noRegistered = true;
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
noRegistered &= addConnectBehavior(transportAddress, connectBehavior);
}
return noRegistered;
}
/**
* A pure delegate transport.
* Can be extracted to a common class if needed in other places in the codebase.
* Adds a new connect behavior that is used for creating connections with the given delegate address.
*
* @return {@code true} if no other send behavior was registered for this address before.
*/
public static class DelegateTransport implements Transport {
protected final Transport transport;
public DelegateTransport(Transport transport) {
this.transport = transport;
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
transport.addConnectionListener(listener);
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return transport.removeConnectionListener(listener);
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
transport.registerRequestHandler(reg);
}
@Override
public RequestHandlerRegistry getRequestHandler(String action) {
return transport.getRequestHandler(action);
}
@Override
public BoundTransportAddress boundAddress() {
return transport.boundAddress();
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return transport.addressesFromString(address, perAddressLimit);
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
return transport.nodeConnected(node);
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
transport.connectToNode(node, connectionProfile, connectionValidator);
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
transport.disconnectFromNode(node);
}
@Override
public List<String> getLocalAddresses() {
return transport.getLocalAddresses();
}
@Override
public Connection getConnection(DiscoveryNode node) {
return new FilteredConnection(transport.getConnection(node)) {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
DelegateTransport.this.sendRequest(connection, requestId, action, request, options);
}
};
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
return new FilteredConnection(transport.openConnection(node, profile)) {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
DelegateTransport.this.sendRequest(connection, requestId, action, request, options);
}
};
}
@Override
public TransportStats getStats() {
return transport.getStats();
}
@Override
public ResponseHandlers getResponseHandlers() {
return transport.getResponseHandlers();
}
@Override
public Lifecycle.State lifecycleState() {
return transport.lifecycleState();
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
transport.addLifecycleListener(listener);
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
transport.removeLifecycleListener(listener);
}
@Override
public void start() {
transport.start();
}
@Override
public void stop() {
transport.stop();
}
@Override
public void close() {
transport.close();
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return transport.profileBoundAddresses();
}
protected void sendRequest(Transport.Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
connection.sendRequest(requestId, action, request, options);
}
public boolean addConnectBehavior(TransportAddress transportAddress, StubbableTransport.OpenConnectionBehavior connectBehavior) {
return transport().addConnectBehavior(transportAddress, connectBehavior);
}
/**
* The delegate transport instances defined in this class mock various kinds of disruption types. This subclass adds a method
* {@link #clearRule()} so that when the disruptions are cleared (see {@link #clearRule(TransportService)}) this gives the
* disruption a possibility to run clean-up actions.
* Adds a new get connection behavior that is used for communication with the given delegate service.
*
* @return {@code true} if no other get connection behavior was registered for any of the addresses bound by delegate service.
*/
public abstract static class ClearableTransport extends DelegateTransport {
public ClearableTransport(Transport transport) {
super(transport);
public boolean addGetConnectionBehavior(TransportService transportService, StubbableConnectionManager.GetConnectionBehavior behavior) {
boolean noRegistered = true;
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
noRegistered &= addGetConnectionBehavior(transportAddress, behavior);
}
return noRegistered;
}
/**
* Called by {@link #clearRule(TransportService)}
* Adds a get connection behavior that is used for communication with the given delegate address.
*
* @return {@code true} if no other get connection behavior was registered for this address before.
*/
public abstract void clearRule();
public boolean addGetConnectionBehavior(TransportAddress transportAddress, StubbableConnectionManager.GetConnectionBehavior behavior) {
return connectionManager().addConnectBehavior(transportAddress, behavior);
}
/**
* Adds a get connection behavior that is the default get connection behavior.
*
* @return {@code true} if no default get connection behavior was registered.
*/
public boolean addGetConnectionBehavior(StubbableConnectionManager.GetConnectionBehavior behavior) {
return connectionManager().setDefaultConnectBehavior(behavior);
}
/**
* Adds a node connected behavior that is used for the given delegate service.
*
* @return {@code true} if no other node connected behavior was registered for any of the addresses bound by delegate service.
*/
public boolean addNodeConnectedBehavior(TransportService transportService, StubbableConnectionManager.NodeConnectedBehavior behavior) {
boolean noRegistered = true;
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
noRegistered &= addNodeConnectedBehavior(transportAddress, behavior);
}
return noRegistered;
}
/**
* Adds a node connected behavior that is used for the given delegate address.
*
* @return {@code true} if no other node connected behavior was registered for this address before.
*/
public boolean addNodeConnectedBehavior(TransportAddress transportAddress, StubbableConnectionManager.NodeConnectedBehavior behavior) {
return connectionManager().addNodeConnectedBehavior(transportAddress, behavior);
}
/**
* Adds a node connected behavior that is the default node connected behavior.
*
* @return {@code true} if no default node connected behavior was registered.
*/
public boolean addNodeConnectedBehavior(StubbableConnectionManager.NodeConnectedBehavior behavior) {
return connectionManager().setDefaultNodeConnectedBehavior(behavior);
}
public StubbableTransport transport() {
return (StubbableTransport) transport;
}
public StubbableConnectionManager connectionManager() {
return (StubbableConnectionManager) connectionManager;
}
List<Tracer> activeTracers = new CopyOnWriteArrayList<>();
@ -766,78 +581,36 @@ public final class MockTransportService extends TransportService {
}
}
private static class FilteredConnection implements Transport.Connection {
protected final Transport.Connection connection;
private FilteredConnection(Transport.Connection connection) {
this.connection = connection;
}
@Override
public DiscoveryNode getNode() {
return connection.getNode();
}
@Override
public Version getVersion() {
return connection.getVersion();
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void close() throws IOException {
connection.close();
}
@Override
public Object getCacheKey() {
return connection.getCacheKey();
}
}
public Transport getOriginalTransport() {
Transport transport = transport();
while (transport instanceof DelegateTransport) {
transport = ((DelegateTransport) transport).transport;
while (transport instanceof StubbableTransport) {
transport = ((StubbableTransport) transport).getDelegate();
}
return transport;
}
@Override
public Transport.Connection openConnection(DiscoveryNode node, ConnectionProfile profile) throws IOException {
FilteredConnection filteredConnection = new FilteredConnection(super.openConnection(node, profile)) {
final AtomicBoolean closed = new AtomicBoolean(false);
Transport.Connection connection = super.openConnection(node, profile);
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (closed.compareAndSet(false, true)) {
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
(n) -> new CopyOnWriteArrayList<>());
connections.add(connection);
}
connection.addCloseListener(ActionListener.wrap(() -> {
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.get(node);
boolean remove = connections.remove(this);
assert remove;
boolean remove = connections.remove(connection);
assert remove : "Should have removed connection";
if (connections.isEmpty()) {
openConnections.remove(node);
}
}
}
}
}));
}
};
synchronized (openConnections) {
List<Transport.Connection> connections = openConnections.computeIfAbsent(node,
(n) -> new CopyOnWriteArrayList<>());
connections.add(filteredConnection);
}
return filteredConnection;
return connection;
}
@Override
@ -851,4 +624,5 @@ public final class MockTransportService extends TransportService {
public DiscoveryNode getLocalDiscoNode() {
return this.getLocalNode();
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionManager;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class StubbableConnectionManager extends ConnectionManager {
private final ConnectionManager delegate;
private final ConcurrentMap<TransportAddress, GetConnectionBehavior> getConnectionBehaviors;
private final ConcurrentMap<TransportAddress, NodeConnectedBehavior> nodeConnectedBehaviors;
private volatile GetConnectionBehavior defaultGetConnectionBehavior = ConnectionManager::getConnection;
private volatile NodeConnectedBehavior defaultNodeConnectedBehavior = ConnectionManager::nodeConnected;
public StubbableConnectionManager(ConnectionManager delegate, Settings settings, Transport transport, ThreadPool threadPool) {
super(settings, transport, threadPool);
this.delegate = delegate;
this.getConnectionBehaviors = new ConcurrentHashMap<>();
this.nodeConnectedBehaviors = new ConcurrentHashMap<>();
}
public boolean addConnectBehavior(TransportAddress transportAddress, GetConnectionBehavior connectBehavior) {
return getConnectionBehaviors.put(transportAddress, connectBehavior) == null;
}
public boolean setDefaultConnectBehavior(GetConnectionBehavior behavior) {
GetConnectionBehavior prior = defaultGetConnectionBehavior;
defaultGetConnectionBehavior = behavior;
return prior == null;
}
public boolean addNodeConnectedBehavior(TransportAddress transportAddress, NodeConnectedBehavior behavior) {
return nodeConnectedBehaviors.put(transportAddress, behavior) == null;
}
public boolean setDefaultNodeConnectedBehavior(NodeConnectedBehavior behavior) {
NodeConnectedBehavior prior = defaultNodeConnectedBehavior;
defaultNodeConnectedBehavior = behavior;
return prior == null;
}
public void clearBehaviors() {
getConnectionBehaviors.clear();
nodeConnectedBehaviors.clear();
}
public void clearBehavior(TransportAddress transportAddress) {
getConnectionBehaviors.remove(transportAddress);
nodeConnectedBehaviors.remove(transportAddress);
}
@Override
public Transport.Connection getConnection(DiscoveryNode node) {
TransportAddress address = node.getAddress();
GetConnectionBehavior behavior = getConnectionBehaviors.getOrDefault(address, defaultGetConnectionBehavior);
return behavior.getConnection(delegate, node);
}
@Override
public boolean nodeConnected(DiscoveryNode node) {
TransportAddress address = node.getAddress();
NodeConnectedBehavior behavior = nodeConnectedBehaviors.getOrDefault(address, defaultNodeConnectedBehavior);
return behavior.nodeConnected(delegate, node);
}
@Override
public void addListener(TransportConnectionListener listener) {
delegate.addListener(listener);
}
@Override
public void removeListener(TransportConnectionListener listener) {
delegate.removeListener(listener);
}
@Override
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile,
CheckedBiConsumer<Transport.Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
delegate.connectToNode(node, connectionProfile, connectionValidator);
}
@Override
public void disconnectFromNode(DiscoveryNode node) {
delegate.disconnectFromNode(node);
}
@Override
public int connectedNodeCount() {
return delegate.connectedNodeCount();
}
@Override
public void close() {
delegate.close();
}
@FunctionalInterface
public interface GetConnectionBehavior {
Transport.Connection getConnection(ConnectionManager connectionManager, DiscoveryNode discoveryNode);
}
@FunctionalInterface
public interface NodeConnectedBehavior {
boolean nodeConnected(ConnectionManager connectionManager, DiscoveryNode discoveryNode);
}
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.RequestHandlerRegistry;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportStats;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class StubbableTransport implements Transport {
private final ConcurrentHashMap<TransportAddress, SendRequestBehavior> sendBehaviors = new ConcurrentHashMap<>();
private final ConcurrentHashMap<TransportAddress, OpenConnectionBehavior> connectBehaviors = new ConcurrentHashMap<>();
private volatile SendRequestBehavior defaultSendRequest = null;
private volatile OpenConnectionBehavior defaultConnectBehavior = null;
private final Transport delegate;
public StubbableTransport(Transport transport) {
this.delegate = transport;
}
boolean setDefaultSendBehavior(SendRequestBehavior sendBehavior) {
SendRequestBehavior prior = defaultSendRequest;
defaultSendRequest = sendBehavior;
return prior == null;
}
boolean addSendBehavior(TransportAddress transportAddress, SendRequestBehavior sendBehavior) {
return sendBehaviors.put(transportAddress, sendBehavior) == null;
}
boolean addConnectBehavior(TransportAddress transportAddress, OpenConnectionBehavior connectBehavior) {
return connectBehaviors.put(transportAddress, connectBehavior) == null;
}
void clearBehaviors() {
sendBehaviors.clear();
connectBehaviors.clear();
}
void clearBehavior(TransportAddress transportAddress) {
SendRequestBehavior behavior = sendBehaviors.remove(transportAddress);
if (behavior != null) {
behavior.clearCallback();
}
connectBehaviors.remove(transportAddress);
}
Transport getDelegate() {
return delegate;
}
@Override
public void addConnectionListener(TransportConnectionListener listener) {
delegate.addConnectionListener(listener);
}
@Override
public boolean removeConnectionListener(TransportConnectionListener listener) {
return delegate.removeConnectionListener(listener);
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(RequestHandlerRegistry<Request> reg) {
delegate.registerRequestHandler(reg);
}
@Override
public RequestHandlerRegistry getRequestHandler(String action) {
return delegate.getRequestHandler(action);
}
@Override
public BoundTransportAddress boundAddress() {
return delegate.boundAddress();
}
@Override
public TransportAddress[] addressesFromString(String address, int perAddressLimit) throws UnknownHostException {
return delegate.addressesFromString(address, perAddressLimit);
}
@Override
public List<String> getLocalAddresses() {
return delegate.getLocalAddresses();
}
@Override
public Connection openConnection(DiscoveryNode node, ConnectionProfile profile) {
TransportAddress address = node.getAddress();
OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior);
Connection connection;
if (behavior == null) {
connection = delegate.openConnection(node, profile);
} else {
connection = behavior.openConnection(delegate, node, profile);
}
return new WrappedConnection(connection);
}
@Override
public TransportStats getStats() {
return delegate.getStats();
}
@Override
public Transport.ResponseHandlers getResponseHandlers() {
return delegate.getResponseHandlers();
}
@Override
public Lifecycle.State lifecycleState() {
return delegate.lifecycleState();
}
@Override
public void addLifecycleListener(LifecycleListener listener) {
delegate.addLifecycleListener(listener);
}
@Override
public void removeLifecycleListener(LifecycleListener listener) {
delegate.removeLifecycleListener(listener);
}
@Override
public void start() {
delegate.start();
}
@Override
public void stop() {
delegate.stop();
}
@Override
public void close() {
delegate.close();
}
@Override
public Map<String, BoundTransportAddress> profileBoundAddresses() {
return delegate.profileBoundAddresses();
}
private class WrappedConnection implements Transport.Connection {
private final Transport.Connection connection;
private WrappedConnection(Transport.Connection connection) {
this.connection = connection;
}
@Override
public DiscoveryNode getNode() {
return connection.getNode();
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
TransportAddress address = connection.getNode().getAddress();
SendRequestBehavior behavior = sendBehaviors.getOrDefault(address, defaultSendRequest);
if (behavior == null) {
connection.sendRequest(requestId, action, request, options);
} else {
behavior.sendRequest(connection, requestId, action, request, options);
}
}
@Override
public boolean sendPing() {
return connection.sendPing();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public Version getVersion() {
return connection.getVersion();
}
@Override
public Object getCacheKey() {
return connection.getCacheKey();
}
@Override
public void close() {
connection.close();
}
}
@FunctionalInterface
public interface OpenConnectionBehavior {
Connection openConnection(Transport transport, DiscoveryNode discoveryNode, ConnectionProfile profile);
}
@FunctionalInterface
public interface SendRequestBehavior {
void sendRequest(Connection connection, long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException;
default void clearCallback() {
}
}
}

View File

@ -18,3 +18,7 @@ dependencies {
dependencyLicenses {
mapping from: /bc.*/, to: 'bouncycastle'
}
if (inFipsJvm) {
test.enabled = false
}

View File

@ -1,16 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.cli;
import org.elasticsearch.test.ESTestCase;
public class TestSuiteCannotBeEmptyTests extends ESTestCase {
public void testIsHereSoThisSuiteIsNonEmpty() {
// This is needed on a FIPS JVM as the rest of the suite is skipped
}
}

View File

@ -382,13 +382,29 @@ public class FullClusterRestartIT extends ESRestTestCase {
}
});
// After we've confirmed the doc, wait until we move back to STARTED so that we know the
// state was saved at the end
waitForRollUpJob("rollup-id-test", equalTo("started"));
} else {
final Request indexRequest = new Request("POST", "/id-test-rollup/_doc/2");
indexRequest.setJsonEntity("{\"timestamp\":\"2018-01-02T00:00:01\",\"value\":345}");
client().performRequest(indexRequest);
assertRollUpJob("rollup-id-test");
// stop the rollup job to force a state save, which will upgrade the ID
final Request stopRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-id-test/_stop");
Map<String, Object> stopRollupJobResponse = entityAsMap(client().performRequest(stopRollupJobRequest));
assertThat(stopRollupJobResponse.get("stopped"), equalTo(Boolean.TRUE));
waitForRollUpJob("rollup-id-test", equalTo("stopped"));
// start the rollup job again
final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-id-test/_start");
Map<String, Object> startRollupJobResponse = entityAsMap(client().performRequest(startRollupJobRequest));
assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE));
waitForRollUpJob("rollup-id-test", anyOf(equalTo("indexing"), equalTo("started")));
assertBusy(() -> {
client().performRequest(new Request("POST", "id-test-results-rollup/_refresh"));

View File

@ -37,7 +37,8 @@ public abstract class SqlSpecTestCase extends SpecBaseIntegrationTestCase {
tests.addAll(readScriptSpec("/agg.sql-spec", parser));
tests.addAll(readScriptSpec("/arithmetic.sql-spec", parser));
tests.addAll(readScriptSpec("/string-functions.sql-spec", parser));
tests.addAll(readScriptSpec("/case-functions.sql-spec", parser));
// AwaitsFix: https://github.com/elastic/elasticsearch/issues/32589
// tests.addAll(readScriptSpec("/case-functions.sql-spec", parser));
return tests;
}