Pass exception from sendMessage to listener (#23559)
This commit changes the listener passed to sendMessage from a Runnable to a ActionListener. This change also removes IOException from the sendMessage signature. That signature is misleading as it allows implementers to assume an exception will be thrown in case of failure. That does not happen due to Netty's async nature.
This commit is contained in:
parent
48357e43d3
commit
5fa80a6521
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.CheckedConsumer;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* A listener that ensures that only one of onResponse or onFailure is called. And the method
|
||||
* the is called is only called once. Subclasses should implement notification logic with
|
||||
* innerOnResponse and innerOnFailure.
|
||||
*/
|
||||
public abstract class NotifyOnceListener<Response> implements ActionListener<Response> {
|
||||
|
||||
private final AtomicBoolean hasBeenCalled = new AtomicBoolean(false);
|
||||
|
||||
protected abstract void innerOnResponse(Response response);
|
||||
|
||||
protected abstract void innerOnFailure(Exception e);
|
||||
|
||||
@Override
|
||||
public final void onResponse(Response response) {
|
||||
if (hasBeenCalled.compareAndSet(false, true)) {
|
||||
innerOnResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void onFailure(Exception e) {
|
||||
if (hasBeenCalled.compareAndSet(false, true)) {
|
||||
innerOnFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,6 +26,8 @@ import org.apache.lucene.util.IOUtils;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.NotifyOnceListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -295,19 +297,25 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
DiscoveryNode node = entry.getKey();
|
||||
NodeChannels channels = entry.getValue();
|
||||
for (Channel channel : channels.getChannels()) {
|
||||
try {
|
||||
sendMessage(channel, pingHeader, successfulPings::inc);
|
||||
} catch (Exception e) {
|
||||
if (isOpen(channel)) {
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
|
||||
failedPings.inc();
|
||||
} else {
|
||||
logger.trace(
|
||||
(Supplier<?>) () -> new ParameterizedMessage(
|
||||
"[{}] failed to send ping transport message (channel closed)", node), e);
|
||||
internalSendMessage(channel, pingHeader, new NotifyOnceListener<Channel>() {
|
||||
@Override
|
||||
public void innerOnResponse(Channel channel) {
|
||||
successfulPings.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnFailure(Exception e) {
|
||||
if (isOpen(channel)) {
|
||||
logger.debug(
|
||||
(Supplier<?>) () -> new ParameterizedMessage("[{}] failed to send ping transport message", node), e);
|
||||
failedPings.inc();
|
||||
} else {
|
||||
logger.trace((Supplier<?>) () ->
|
||||
new ParameterizedMessage("[{}] failed to send ping transport message (channel closed)", node), e);
|
||||
}
|
||||
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -358,7 +366,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
typeMapping = new EnumMap<>(TransportRequestOptions.Type.class);
|
||||
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile.getHandles()) {
|
||||
for (TransportRequestOptions.Type type : handle.getTypes())
|
||||
typeMapping.put(type, handle);
|
||||
typeMapping.put(type, handle);
|
||||
}
|
||||
version = node.getVersion();
|
||||
}
|
||||
|
@ -415,7 +423,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
throw new NodeNotConnectedException(node, "connection already closed");
|
||||
}
|
||||
Channel channel = channel(options.type());
|
||||
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte)0);
|
||||
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -683,7 +691,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
for (int i = 0; i < hostAddresses.length; i++) {
|
||||
addresses[i] = NetworkAddress.format(hostAddresses[i]);
|
||||
}
|
||||
logger.debug("binding server bootstrap to: {}", (Object)addresses);
|
||||
logger.debug("binding server bootstrap to: {}", (Object) addresses);
|
||||
}
|
||||
|
||||
assert hostAddresses.length > 0;
|
||||
|
@ -907,7 +915,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
}
|
||||
}
|
||||
|
||||
protected void onException(Channel channel, Exception e) throws IOException {
|
||||
protected void onException(Channel channel, Exception e) {
|
||||
if (!lifecycle.started()) {
|
||||
// just close and ignore - we are already stopped and just need to make sure we release all resources
|
||||
disconnectFromNodeChannel(channel, e);
|
||||
|
@ -940,23 +948,27 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
} else if (e instanceof TcpTransport.HttpOnTransportException) {
|
||||
// in case we are able to return data, serialize the exception content and sent it back to the client
|
||||
if (isOpen(channel)) {
|
||||
final Runnable closeChannel = () -> {
|
||||
try {
|
||||
closeChannels(Collections.singletonList(channel));
|
||||
} catch (IOException e1) {
|
||||
logger.debug("failed to close httpOnTransport channel", e1);
|
||||
final NotifyOnceListener<Channel> closeChannel = new NotifyOnceListener<Channel>() {
|
||||
@Override
|
||||
public void innerOnResponse(Channel channel) {
|
||||
try {
|
||||
closeChannels(Collections.singletonList(channel));
|
||||
} catch (IOException e1) {
|
||||
logger.debug("failed to close httpOnTransport channel", e1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnFailure(Exception e) {
|
||||
try {
|
||||
closeChannels(Collections.singletonList(channel));
|
||||
} catch (IOException e1) {
|
||||
e.addSuppressed(e1);
|
||||
logger.debug("failed to close httpOnTransport channel", e1);
|
||||
}
|
||||
}
|
||||
};
|
||||
boolean success = false;
|
||||
try {
|
||||
sendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
// it's fine to call this more than once
|
||||
closeChannel.run();
|
||||
}
|
||||
}
|
||||
internalSendMessage(channel, new BytesArray(e.getMessage().getBytes(StandardCharsets.UTF_8)), closeChannel);
|
||||
}
|
||||
} else {
|
||||
logger.warn(
|
||||
|
@ -973,7 +985,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
|
||||
/**
|
||||
* Binds to the given {@link InetSocketAddress}
|
||||
* @param name the profile name
|
||||
*
|
||||
* @param name the profile name
|
||||
* @param address the address to bind to
|
||||
*/
|
||||
protected abstract Channel bind(String name, InetSocketAddress address) throws IOException;
|
||||
|
@ -983,8 +996,14 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
*/
|
||||
protected abstract void closeChannels(List<Channel> channel) throws IOException;
|
||||
|
||||
|
||||
protected abstract void sendMessage(Channel channel, BytesReference reference, Runnable sendListener) throws IOException;
|
||||
/**
|
||||
* Sends message to channel. The listener's onResponse method will be called when the send is complete unless an exception
|
||||
* is thrown during the send. If an exception is thrown, the listener's onException method will be called.
|
||||
* @param channel the destination channel
|
||||
* @param reference the byte reference for the message
|
||||
* @param listener the listener to call when the operation has completed
|
||||
*/
|
||||
protected abstract void sendMessage(Channel channel, BytesReference reference, ActionListener<Channel> listener);
|
||||
|
||||
protected abstract NodeChannels connectToChannels(DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException;
|
||||
|
||||
|
@ -997,8 +1016,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
return compress && (!(request instanceof BytesTransportRequest));
|
||||
}
|
||||
|
||||
private void sendRequestToChannel(DiscoveryNode node, final Channel targetChannel, final long requestId, final String action,
|
||||
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
|
||||
private void sendRequestToChannel(final DiscoveryNode node, final Channel targetChannel, final long requestId, final String action,
|
||||
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
|
||||
byte status) throws IOException,
|
||||
TransportException {
|
||||
if (compress) {
|
||||
|
@ -1009,7 +1028,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
// we wrap this in a release once since if the onRequestSent callback throws an exception
|
||||
// we might release things twice and this should be prevented
|
||||
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
|
||||
boolean addedReleaseListener = false;
|
||||
StreamOutput stream = bStream;
|
||||
try {
|
||||
// only compress if asked, and, the request is not bytes, since then only
|
||||
|
@ -1029,43 +1047,31 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
stream.writeString(action);
|
||||
BytesReference message = buildMessage(requestId, status, node.getVersion(), request, stream, bStream);
|
||||
final TransportRequestOptions finalOptions = options;
|
||||
Runnable onRequestSent = () -> { // this might be called in a different thread
|
||||
try {
|
||||
toRelease.close();
|
||||
} finally {
|
||||
transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions);
|
||||
}
|
||||
};
|
||||
addedReleaseListener = internalSendMessage(targetChannel, message, onRequestSent);
|
||||
// this might be called in a different thread
|
||||
SendListener onRequestSent = new SendListener(toRelease,
|
||||
() -> transportServiceAdapter.onRequestSent(node, requestId, action, request, finalOptions));
|
||||
internalSendMessage(targetChannel, message, onRequestSent);
|
||||
} finally {
|
||||
IOUtils.close(stream);
|
||||
if (!addedReleaseListener) {
|
||||
toRelease.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* sends a message view the given channel, using the given callbacks.
|
||||
*
|
||||
* @return true if the message was successfully sent or false when an error occurred and the error hanlding logic was activated
|
||||
*
|
||||
* sends a message to the given channel, using the given callbacks.
|
||||
*/
|
||||
private boolean internalSendMessage(Channel targetChannel, BytesReference message, Runnable onRequestSent) throws IOException {
|
||||
boolean success;
|
||||
private void internalSendMessage(Channel targetChannel, BytesReference message, NotifyOnceListener<Channel> listener) {
|
||||
try {
|
||||
sendMessage(targetChannel, message, onRequestSent);
|
||||
success = true;
|
||||
} catch (IOException ex) {
|
||||
// passing exception handling to deal with this and raise disconnect events and decide the right logging level
|
||||
sendMessage(targetChannel, message, listener);
|
||||
} catch (Exception ex) {
|
||||
// call listener to ensure that any resources are released
|
||||
listener.onFailure(ex);
|
||||
onException(targetChannel, ex);
|
||||
success = false;
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends back an error response to the caller via the given channel
|
||||
*
|
||||
* @param nodeVersion the caller node version
|
||||
* @param channel the channel to send the response to
|
||||
* @param error the error to return
|
||||
|
@ -1085,8 +1091,9 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
status = TransportStatus.setError(status);
|
||||
final BytesReference bytes = stream.bytes();
|
||||
final BytesReference header = buildHeader(requestId, status, nodeVersion, bytes.length());
|
||||
Runnable onRequestSent = () -> transportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
sendMessage(channel, new CompositeBytesReference(header, bytes), onRequestSent);
|
||||
SendListener onResponseSent = new SendListener(null,
|
||||
() -> transportServiceAdapter.onResponseSent(requestId, action, error));
|
||||
internalSendMessage(channel, new CompositeBytesReference(header, bytes), onResponseSent);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1097,7 +1104,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
*/
|
||||
public void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId,
|
||||
final String action, TransportResponseOptions options) throws IOException {
|
||||
sendResponse(nodeVersion, channel, response, requestId, action, options, (byte)0);
|
||||
sendResponse(nodeVersion, channel, response, requestId, action, options, (byte) 0);
|
||||
}
|
||||
|
||||
private void sendResponse(Version nodeVersion, Channel channel, final TransportResponse response, final long requestId,
|
||||
|
@ -1110,7 +1117,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
// we wrap this in a release once since if the onRequestSent callback throws an exception
|
||||
// we might release things twice and this should be prevented
|
||||
final Releasable toRelease = Releasables.releaseOnce(() -> Releasables.close(bStream.bytes()));
|
||||
boolean addedReleaseListener = false;
|
||||
StreamOutput stream = bStream;
|
||||
try {
|
||||
if (options.compress()) {
|
||||
|
@ -1122,24 +1128,12 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
BytesReference reference = buildMessage(requestId, status, nodeVersion, response, stream, bStream);
|
||||
|
||||
final TransportResponseOptions finalOptions = options;
|
||||
Runnable onRequestSent = () -> { // this might be called in a different thread
|
||||
try {
|
||||
toRelease.close();
|
||||
} finally {
|
||||
transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions);
|
||||
}
|
||||
};
|
||||
addedReleaseListener = internalSendMessage(channel, reference, onRequestSent);
|
||||
// this might be called in a different thread
|
||||
SendListener listener = new SendListener(toRelease,
|
||||
() -> transportServiceAdapter.onResponseSent(requestId, action, response, finalOptions));
|
||||
internalSendMessage(channel, reference, listener);
|
||||
} finally {
|
||||
try {
|
||||
IOUtils.close(stream);
|
||||
} finally {
|
||||
if (!addedReleaseListener) {
|
||||
|
||||
toRelease.close();
|
||||
}
|
||||
}
|
||||
|
||||
IOUtils.close(stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1242,7 +1236,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new IllegalArgumentException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded ["
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.length() < dataLen + sizeHeaderLength) {
|
||||
|
@ -1254,7 +1248,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
private static boolean bufferStartsWith(BytesReference buffer, int offset, String method) {
|
||||
char[] chars = method.toCharArray();
|
||||
for (int i = 0; i < chars.length; i++) {
|
||||
if (buffer.get(offset+ i) != chars[i]) {
|
||||
if (buffer.get(offset + i) != chars[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -1277,7 +1271,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
return RestStatus.BAD_REQUEST;
|
||||
}
|
||||
|
||||
public HttpOnTransportException(StreamInput in) throws IOException{
|
||||
public HttpOnTransportException(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
}
|
||||
|
@ -1383,7 +1377,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
handler.handleResponse(response);
|
||||
}});
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
@ -1423,7 +1418,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
if (TransportStatus.isHandshake(status)) {
|
||||
final VersionHandshakeResponse response = new VersionHandshakeResponse(getCurrentVersion());
|
||||
sendResponse(version, channel, response, requestId, HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY,
|
||||
TransportStatus.setHandshake((byte)0));
|
||||
TransportStatus.setHandshake((byte) 0));
|
||||
} else {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
|
@ -1552,7 +1547,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
// to as the payload.
|
||||
final Version minCompatVersion = getCurrentVersion().minimumCompatibilityVersion();
|
||||
sendRequestToChannel(node, channel, requestId, HANDSHAKE_ACTION_NAME, TransportRequest.Empty.INSTANCE,
|
||||
TransportRequestOptions.EMPTY, minCompatVersion, TransportStatus.setHandshake((byte)0));
|
||||
TransportRequestOptions.EMPTY, minCompatVersion, TransportStatus.setHandshake((byte) 0));
|
||||
if (handler.latch.await(timeout.millis(), TimeUnit.MILLISECONDS) == false) {
|
||||
throw new ConnectTransportException(node, "handshake_timeout[" + timeout + "]");
|
||||
}
|
||||
|
@ -1594,7 +1589,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
protected final void onChannelClosed(Channel channel) {
|
||||
final Optional<Long> first = pendingHandshakes.entrySet().stream()
|
||||
.filter((entry) -> entry.getValue().channel == channel).map((e) -> e.getKey()).findFirst();
|
||||
if(first.isPresent()) {
|
||||
if (first.isPresent()) {
|
||||
final Long requestId = first.get();
|
||||
final HandshakeResponseHandler handler = pendingHandshakes.remove(requestId);
|
||||
if (handler != null) {
|
||||
|
@ -1607,6 +1602,7 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
|
||||
/**
|
||||
* Ensures this transport is still started / open
|
||||
*
|
||||
* @throws IllegalStateException if the transport is not started / open
|
||||
*/
|
||||
protected final void ensureOpen() {
|
||||
|
@ -1614,4 +1610,28 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
|
|||
throw new IllegalStateException("transport has been stopped");
|
||||
}
|
||||
}
|
||||
|
||||
private final class SendListener extends NotifyOnceListener<Channel> {
|
||||
private final Releasable optionalReleasable;
|
||||
private final Runnable transportAdaptorCallback;
|
||||
|
||||
private SendListener(Releasable optionalReleasable, Runnable transportAdaptorCallback) {
|
||||
this.optionalReleasable = optionalReleasable;
|
||||
this.transportAdaptorCallback = transportAdaptorCallback;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnResponse(Channel channel) {
|
||||
release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnFailure(Exception e) {
|
||||
release();
|
||||
}
|
||||
|
||||
private void release() {
|
||||
Releasables.close(optionalReleasable, transportAdaptorCallback::run);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class NotifyOnceListenerTests extends ESTestCase {
|
||||
|
||||
public void testWhenSuccessCannotNotifyMultipleTimes() {
|
||||
AtomicReference<String> response = new AtomicReference<>();
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
NotifyOnceListener<String> listener = new NotifyOnceListener<String>() {
|
||||
@Override
|
||||
public void innerOnResponse(String s) {
|
||||
response.set(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnFailure(Exception e) {
|
||||
exception.set(e);
|
||||
}
|
||||
};
|
||||
|
||||
listener.onResponse("response");
|
||||
listener.onResponse("wrong-response");
|
||||
listener.onFailure(new RuntimeException());
|
||||
|
||||
assertNull(exception.get());
|
||||
assertEquals("response", response.get());
|
||||
}
|
||||
|
||||
public void testWhenErrorCannotNotifyMultipleTimes() {
|
||||
AtomicReference<String> response = new AtomicReference<>();
|
||||
AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
|
||||
NotifyOnceListener<String> listener = new NotifyOnceListener<String>() {
|
||||
@Override
|
||||
public void innerOnResponse(String s) {
|
||||
response.set(s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnFailure(Exception e) {
|
||||
exception.set(e);
|
||||
}
|
||||
};
|
||||
|
||||
RuntimeException expected = new RuntimeException();
|
||||
listener.onFailure(expected);
|
||||
listener.onFailure(new IllegalArgumentException());
|
||||
listener.onResponse("response");
|
||||
|
||||
assertNull(response.get());
|
||||
assertSame(expected, exception.get());
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
|
@ -38,6 +39,8 @@ import java.net.InetSocketAddress;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
|
@ -152,6 +155,7 @@ public class TCPTransportTests extends ESTestCase {
|
|||
final AtomicBoolean called = new AtomicBoolean(false);
|
||||
Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100));
|
||||
ThreadPool threadPool = new TestThreadPool(TCPTransportTests.class.getName());
|
||||
AtomicReference<IOException> exceptionReference = new AtomicReference<>();
|
||||
try {
|
||||
TcpTransport transport = new TcpTransport("test", Settings.builder().put("transport.tcp.compress", compressed).build(),
|
||||
threadPool, new BigArrays(Settings.EMPTY, null), null, null, null) {
|
||||
|
@ -171,27 +175,31 @@ public class TCPTransportTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void sendMessage(Object o, BytesReference reference, Runnable sendListener) throws IOException {
|
||||
StreamInput streamIn = reference.streamInput();
|
||||
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
|
||||
int len = streamIn.readInt();
|
||||
long requestId = streamIn.readLong();
|
||||
assertEquals(42, requestId);
|
||||
byte status = streamIn.readByte();
|
||||
Version version = Version.fromId(streamIn.readInt());
|
||||
assertEquals(Version.CURRENT, version);
|
||||
assertEquals(compressed, TransportStatus.isCompress(status));
|
||||
called.compareAndSet(false, true);
|
||||
if (compressed) {
|
||||
final int bytesConsumed = TcpHeader.HEADER_SIZE;
|
||||
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
|
||||
.streamInput(streamIn);
|
||||
protected void sendMessage(Object o, BytesReference reference, ActionListener listener) {
|
||||
try {
|
||||
StreamInput streamIn = reference.streamInput();
|
||||
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
|
||||
int len = streamIn.readInt();
|
||||
long requestId = streamIn.readLong();
|
||||
assertEquals(42, requestId);
|
||||
byte status = streamIn.readByte();
|
||||
Version version = Version.fromId(streamIn.readInt());
|
||||
assertEquals(Version.CURRENT, version);
|
||||
assertEquals(compressed, TransportStatus.isCompress(status));
|
||||
called.compareAndSet(false, true);
|
||||
if (compressed) {
|
||||
final int bytesConsumed = TcpHeader.HEADER_SIZE;
|
||||
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
|
||||
.streamInput(streamIn);
|
||||
}
|
||||
threadPool.getThreadContext().readHeaders(streamIn);
|
||||
assertEquals("foobar", streamIn.readString());
|
||||
Req readReq = new Req("");
|
||||
readReq.readFrom(streamIn);
|
||||
assertEquals(request.value, readReq.value);
|
||||
} catch (IOException e) {
|
||||
exceptionReference.set(e);
|
||||
}
|
||||
threadPool.getThreadContext().readHeaders(streamIn);
|
||||
assertEquals("foobar", streamIn.readString());
|
||||
Req readReq = new Req("");
|
||||
readReq.readFrom(streamIn);
|
||||
assertEquals(request.value, readReq.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -219,6 +227,7 @@ public class TCPTransportTests extends ESTestCase {
|
|||
Transport.Connection connection = transport.getConnection(node);
|
||||
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
|
||||
assertTrue(called.get());
|
||||
assertNull("IOException while sending message.", exceptionReference.get());
|
||||
} finally {
|
||||
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
|
|||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -73,8 +74,10 @@ import java.util.Collections;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
|
||||
import static org.elasticsearch.common.settings.Setting.intSetting;
|
||||
|
@ -393,9 +396,20 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void sendMessage(Channel channel, BytesReference reference, Runnable sendListener) {
|
||||
protected void sendMessage(Channel channel, BytesReference reference, ActionListener<Channel> listener) {
|
||||
final ChannelFuture future = channel.writeAndFlush(Netty4Utils.toByteBuf(reference));
|
||||
future.addListener(f -> sendListener.run());
|
||||
future.addListener(f -> {
|
||||
if (f.isSuccess()) {
|
||||
listener.onResponse(channel);
|
||||
} else {
|
||||
Throwable cause = f.cause();
|
||||
// If the Throwable is an Error something has gone very wrong and Netty4MessageChannelHandler is
|
||||
// going to cause that to bubble up and kill the process.
|
||||
if (cause instanceof Exception) {
|
||||
listener.onFailure((Exception) cause);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -79,7 +80,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
|||
|
||||
private final Set<MockChannel> openChannels = new HashSet<>();
|
||||
|
||||
static {
|
||||
static {
|
||||
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
|
||||
builder.addConnections(1,
|
||||
TransportRequestOptions.Type.BULK,
|
||||
|
@ -129,11 +130,7 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
|||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
try {
|
||||
onException(serverMockChannel, e);
|
||||
} catch (IOException ex) {
|
||||
logger.warn("failed on handling exception", ex);
|
||||
}
|
||||
onException(serverMockChannel, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -242,15 +239,18 @@ public class MockTcpTransport extends TcpTransport<MockTcpTransport.MockChannel>
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void sendMessage(MockChannel mockChannel, BytesReference reference, Runnable sendListener) throws IOException {
|
||||
synchronized (mockChannel) {
|
||||
final Socket socket = mockChannel.activeChannel;
|
||||
OutputStream outputStream = new BufferedOutputStream(socket.getOutputStream());
|
||||
reference.writeTo(outputStream);
|
||||
outputStream.flush();
|
||||
}
|
||||
if (sendListener != null) {
|
||||
sendListener.run();
|
||||
protected void sendMessage(MockChannel mockChannel, BytesReference reference, ActionListener<MockChannel> listener) {
|
||||
try {
|
||||
synchronized (mockChannel) {
|
||||
final Socket socket = mockChannel.activeChannel;
|
||||
OutputStream outputStream = new BufferedOutputStream(socket.getOutputStream());
|
||||
reference.writeTo(outputStream);
|
||||
outputStream.flush();
|
||||
}
|
||||
listener.onResponse(mockChannel);
|
||||
} catch (IOException e) {
|
||||
listener.onFailure(e);
|
||||
onException(mockChannel, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue