Serializing outbound transport message on the IO loop was introduced in https://github.com/elastic/elasticsearch/pull/56961. Unfortunately it turns out that this is incompatible with assumptions made by CCR code here: f22ddf822e/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java (L60-L61)
and that are not easy to work around on short notice.
Raising reverting this move (as a temporary solution, it's still a valuable change long-term) as a blocker therefore as this seriously affects the stability of the initial phase of the CCR following by causing corrupted bytes to be send to the follower.
This commit is contained in:
parent
9e4105ec37
commit
51e9d6f227
|
@ -33,11 +33,9 @@ import org.elasticsearch.common.lease.Releasables;
|
|||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.InboundPipeline;
|
||||
import org.elasticsearch.transport.OutboundHandler;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.Transports;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
|
@ -93,15 +91,15 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
|
|||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
assert msg instanceof OutboundHandler.SendContext;
|
||||
assert msg instanceof ByteBuf;
|
||||
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
|
||||
final boolean queued = queuedWrites.offer(new WriteOperation((OutboundHandler.SendContext) msg, promise));
|
||||
final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
|
||||
assert queued;
|
||||
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException {
|
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
|
||||
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
|
||||
if (ctx.channel().isWritable()) {
|
||||
doFlush(ctx);
|
||||
|
@ -110,7 +108,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws IOException {
|
||||
public void flush(ChannelHandlerContext ctx) {
|
||||
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
|
||||
Channel channel = ctx.channel();
|
||||
if (channel.isWritable() || channel.isActive() == false) {
|
||||
|
@ -126,7 +124,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
|
|||
super.channelInactive(ctx);
|
||||
}
|
||||
|
||||
private void doFlush(ChannelHandlerContext ctx) throws IOException {
|
||||
private void doFlush(ChannelHandlerContext ctx) {
|
||||
assert ctx.executor().inEventLoop();
|
||||
final Channel channel = ctx.channel();
|
||||
if (channel.isActive() == false) {
|
||||
|
@ -144,25 +142,24 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
|
|||
break;
|
||||
}
|
||||
final WriteOperation write = currentWrite;
|
||||
final ByteBuf currentBuffer = write.buffer();
|
||||
if (currentBuffer.readableBytes() == 0) {
|
||||
if (write.buf.readableBytes() == 0) {
|
||||
write.promise.trySuccess();
|
||||
currentWrite = null;
|
||||
continue;
|
||||
}
|
||||
final int readableBytes = currentBuffer.readableBytes();
|
||||
final int readableBytes = write.buf.readableBytes();
|
||||
final int bufferSize = Math.min(readableBytes, 1 << 18);
|
||||
final int readerIndex = currentBuffer.readerIndex();
|
||||
final int readerIndex = write.buf.readerIndex();
|
||||
final boolean sliced = readableBytes != bufferSize;
|
||||
final ByteBuf writeBuffer;
|
||||
if (sliced) {
|
||||
writeBuffer = currentBuffer.retainedSlice(readerIndex, bufferSize);
|
||||
currentBuffer.readerIndex(readerIndex + bufferSize);
|
||||
writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize);
|
||||
write.buf.readerIndex(readerIndex + bufferSize);
|
||||
} else {
|
||||
writeBuffer = currentBuffer;
|
||||
writeBuffer = write.buf;
|
||||
}
|
||||
final ChannelFuture writeFuture = ctx.write(writeBuffer);
|
||||
if (sliced == false || currentBuffer.readableBytes() == 0) {
|
||||
if (sliced == false || write.buf.readableBytes() == 0) {
|
||||
currentWrite = null;
|
||||
writeFuture.addListener(future -> {
|
||||
assert ctx.executor().inEventLoop();
|
||||
|
@ -197,24 +194,13 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
|
|||
|
||||
private static final class WriteOperation {
|
||||
|
||||
private ByteBuf buf;
|
||||
|
||||
private OutboundHandler.SendContext context;
|
||||
private final ByteBuf buf;
|
||||
|
||||
private final ChannelPromise promise;
|
||||
|
||||
WriteOperation(OutboundHandler.SendContext context, ChannelPromise promise) {
|
||||
this.context = context;
|
||||
WriteOperation(ByteBuf buf, ChannelPromise promise) {
|
||||
this.buf = buf;
|
||||
this.promise = promise;
|
||||
}
|
||||
|
||||
ByteBuf buffer() throws IOException {
|
||||
if (buf == null) {
|
||||
buf = Netty4Utils.toByteBuf(context.get());
|
||||
context = null;
|
||||
}
|
||||
assert context == null;
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,8 +25,8 @@ import io.netty.channel.ChannelPromise;
|
|||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.concurrent.CompletableContext;
|
||||
import org.elasticsearch.transport.OutboundHandler;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
import org.elasticsearch.transport.TransportException;
|
||||
|
||||
|
@ -142,11 +142,11 @@ public class Netty4TcpChannel implements TcpChannel {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(OutboundHandler.SendContext sendContext) {
|
||||
channel.writeAndFlush(sendContext, addPromise(sendContext, channel));
|
||||
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
|
||||
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));
|
||||
|
||||
if (channel.eventLoop().isShutdown()) {
|
||||
sendContext.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
|
||||
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,10 +22,8 @@ package org.elasticsearch.transport.nio;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.nio.NioSocketChannel;
|
||||
import org.elasticsearch.transport.OutboundHandler;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.SocketChannel;
|
||||
|
||||
public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
|
||||
|
@ -40,16 +38,8 @@ public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
|
|||
this.profile = profile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(OutboundHandler.SendContext sendContext) {
|
||||
final BytesReference message;
|
||||
try {
|
||||
message = sendContext.get();
|
||||
} catch (IOException e) {
|
||||
sendContext.onFailure(e);
|
||||
return;
|
||||
}
|
||||
getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext));
|
||||
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
|
||||
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
|
||||
public final class OutboundHandler {
|
||||
final class OutboundHandler {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(OutboundHandler.class);
|
||||
|
||||
|
@ -66,7 +66,12 @@ public final class OutboundHandler {
|
|||
|
||||
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
|
||||
SendContext sendContext = new SendContext(channel, () -> bytes, listener);
|
||||
try {
|
||||
internalSend(channel, sendContext);
|
||||
} catch (IOException e) {
|
||||
// This should not happen as the bytes are already serialized
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -120,17 +125,17 @@ public final class OutboundHandler {
|
|||
internalSend(channel, sendContext);
|
||||
}
|
||||
|
||||
private void internalSend(TcpChannel channel, SendContext sendContext) {
|
||||
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
|
||||
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
|
||||
BytesReference reference = sendContext.get();
|
||||
// stash thread context so that channel event loop is not polluted by thread context
|
||||
try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
|
||||
channel.sendMessage(sendContext);
|
||||
channel.sendMessage(reference, sendContext);
|
||||
} catch (RuntimeException ex) {
|
||||
sendContext.onFailure(ex);
|
||||
CloseableChannel.closeChannel(channel);
|
||||
throw ex;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void setMessageListener(TransportMessageListener listener) {
|
||||
|
@ -143,7 +148,7 @@ public final class OutboundHandler {
|
|||
|
||||
private static class MessageSerializer implements CheckedSupplier<BytesReference, IOException>, Releasable {
|
||||
|
||||
private OutboundMessage message;
|
||||
private final OutboundMessage message;
|
||||
private final BigArrays bigArrays;
|
||||
private volatile ReleasableBytesStreamOutput bytesStreamOutput;
|
||||
|
||||
|
@ -154,12 +159,8 @@ public final class OutboundHandler {
|
|||
|
||||
@Override
|
||||
public BytesReference get() throws IOException {
|
||||
try {
|
||||
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
|
||||
return message.serialize(bytesStreamOutput);
|
||||
} finally {
|
||||
message = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -168,10 +169,10 @@ public final class OutboundHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
|
||||
private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
|
||||
|
||||
private final TcpChannel channel;
|
||||
private CheckedSupplier<BytesReference, IOException> messageSupplier;
|
||||
private final CheckedSupplier<BytesReference, IOException> messageSupplier;
|
||||
private final ActionListener<Void> listener;
|
||||
private final Releasable optionalReleasable;
|
||||
private long messageSize = -1;
|
||||
|
@ -189,13 +190,10 @@ public final class OutboundHandler {
|
|||
this.optionalReleasable = optionalReleasable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesReference get() throws IOException {
|
||||
BytesReference message;
|
||||
try {
|
||||
assert messageSupplier != null;
|
||||
message = messageSupplier.get();
|
||||
messageSupplier = null;
|
||||
messageSize = message.length();
|
||||
TransportLogger.logOutboundMessage(channel, message);
|
||||
return message;
|
||||
|
@ -214,7 +212,6 @@ public final class OutboundHandler {
|
|||
|
||||
@Override
|
||||
protected void innerOnFailure(Exception e) {
|
||||
messageSupplier = null;
|
||||
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
|
||||
logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e);
|
||||
} else {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.network.CloseableChannel;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
|
@ -58,11 +59,13 @@ public interface TcpChannel extends CloseableChannel {
|
|||
InetSocketAddress getRemoteAddress();
|
||||
|
||||
/**
|
||||
* Sends a tcp message to the channel.
|
||||
* Sends a tcp message to the channel. The listener will be executed once the send process has been
|
||||
* completed.
|
||||
*
|
||||
* @param sendContext Send Context
|
||||
* @param reference to send to channel
|
||||
* @param listener to execute upon send completion
|
||||
*/
|
||||
void sendMessage(OutboundHandler.SendContext sendContext);
|
||||
void sendMessage(BytesReference reference, ActionListener<Void> listener);
|
||||
|
||||
/**
|
||||
* Adds a listener that will be executed when the channel is connected. If the channel is still
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.elasticsearch.action.ActionListener;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.concurrent.CompletableContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
|
@ -89,14 +88,9 @@ public class FakeTcpChannel implements TcpChannel {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(OutboundHandler.SendContext sendContext) {
|
||||
try {
|
||||
messageCaptor.set(sendContext.get());
|
||||
} catch (IOException e) {
|
||||
sendContext.onFailure(e);
|
||||
return;
|
||||
}
|
||||
listenerCaptor.set(sendContext);
|
||||
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
|
||||
messageCaptor.set(reference);
|
||||
listenerCaptor.set(listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -55,7 +55,6 @@ import org.elasticsearch.nio.ServerChannelContext;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectionProfile;
|
||||
import org.elasticsearch.transport.InboundPipeline;
|
||||
import org.elasticsearch.transport.OutboundHandler;
|
||||
import org.elasticsearch.transport.StatsTracker;
|
||||
import org.elasticsearch.transport.TcpChannel;
|
||||
import org.elasticsearch.transport.TcpServerChannel;
|
||||
|
@ -366,15 +365,8 @@ public class MockNioTransport extends TcpTransport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(OutboundHandler.SendContext sendContext) {
|
||||
final BytesReference message;
|
||||
try {
|
||||
message = sendContext.get();
|
||||
} catch (IOException e) {
|
||||
sendContext.onFailure(e);
|
||||
return;
|
||||
}
|
||||
getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext));
|
||||
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
|
||||
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue