Only compress responses if request was compressed (#36867)

This is a follow-up to some discussions around #36399. Currently we have
relatively confusing compression behavior where compression can be
configured for requests based on transport.compress or a specific
setting for a remote cluster. However, we can only compress responses
based on transport.compress as we do not know where a request is
coming from (currently).

This commit modifies the behavior to NEVER compress responses based on
settings. Instead, a response will only be compressed if the request was
compressed. This commit also updates the documentation to more clearly
described transport level compression.
This commit is contained in:
Tim Brooks 2018-12-21 10:14:00 -07:00 committed by GitHub
parent 3f5dd792b3
commit c8a8391dfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 19 deletions

View File

@ -102,6 +102,34 @@ and ensuring that the keepalive interval is shorter than any timeout that might
cause idle connections to be closed, or by setting `transport.ping_schedule` if cause idle connections to be closed, or by setting `transport.ping_schedule` if
keepalives cannot be configured. keepalives cannot be configured.
[float]
==== Transport Compression
[float]
===== Request Compresssion
By default, the `transport.compress` setting is `false` and network-level
request compression is disabled between nodes in the cluster. This default
normally makes sense for local cluster communication as compression has a
noticeable CPU cost and local clusters tend to be set up with fast network
connections between nodes.
The `transport.compress` setting always configures local cluster request
compression and is the fallback setting for remote cluster request compression.
If you want to configure remote request compression differently than local
request compression, you can set it on a per-remote cluster basis using the
<<remote-cluster-settings,`cluster.remote.${cluster_alias}.transport.compress` setting>>.
[float]
===== Response Compression
The compression settings do not configure compression for responses. {es} will
compress a response if the inbound request was compressed--even when compression
is not enabled. Similarly, {es} will not compress a response if the inbound
request was uncompressed--even when compression is enabled.
[float] [float]
=== Transport Tracer === Transport Tracer

View File

@ -141,7 +141,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// this lock is here to make sure we close this transport and disconnect all the client nodes // this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on // connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final boolean compressAllResponses;
private volatile BoundTransportAddress boundAddress; private volatile BoundTransportAddress boundAddress;
private final String transportName; private final String transportName;
@ -166,7 +165,6 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.pageCacheRecycler = pageCacheRecycler; this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.namedWriteableRegistry = namedWriteableRegistry; this.namedWriteableRegistry = namedWriteableRegistry;
this.compressAllResponses = TransportSettings.TRANSPORT_COMPRESS.get(settings);
this.networkService = networkService; this.networkService = networkService;
this.transportName = transportName; this.transportName = transportName;
this.transportLogger = new TransportLogger(); this.transportLogger = new TransportLogger();
@ -826,14 +824,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final String action, final String action,
boolean compress, boolean compress,
byte status) throws IOException { byte status) throws IOException {
boolean compressMessage = compress || compressAllResponses;
status = TransportStatus.setResponse(status); status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage); CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compress);
boolean addedReleaseListener = false; boolean addedReleaseListener = false;
try { try {
if (compressMessage) { if (compress) {
status = TransportStatus.setCompress(status); status = TransportStatus.setCompress(status);
} }
threadPool.getThreadContext().writeTo(stream); threadPool.getThreadContext().writeTo(stream);

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -34,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
@ -41,13 +43,14 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.io.StreamCorruptedException; import java.io.StreamCorruptedException;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Mockito.mock;
/** Unit tests for {@link TcpTransport} */ /** Unit tests for {@link TcpTransport} */
public class TcpTransportTests extends ESTestCase { public class TcpTransportTests extends ESTestCase {
@ -184,11 +187,12 @@ public class TcpTransportTests extends ESTestCase {
+ version.minimumCompatibilityVersion() + "]", ise.getMessage()); + version.minimumCompatibilityVersion() + "]", ise.getMessage());
} }
public void testCompressRequest() throws IOException { @SuppressForbidden(reason = "Allow accessing localhost")
public void testCompressRequestAndResponse() throws IOException {
final boolean compressed = randomBoolean(); final boolean compressed = randomBoolean();
Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100)); Req request = new Req(randomRealisticUnicodeOfLengthBetween(10, 100));
ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName()); ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName());
AtomicReference<BytesReference> messageCaptor = new AtomicReference<>(); AtomicReference<BytesReference> requestCaptor = new AtomicReference<>();
try { try {
TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool,
PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) { PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService(), null, null) {
@ -200,7 +204,7 @@ public class TcpTransportTests extends ESTestCase {
@Override @Override
protected FakeTcpChannel initiateChannel(DiscoveryNode node) throws IOException { protected FakeTcpChannel initiateChannel(DiscoveryNode node) throws IOException {
return new FakeTcpChannel(true, messageCaptor); return new FakeTcpChannel(false, requestCaptor);
} }
@Override @Override
@ -215,7 +219,7 @@ public class TcpTransportTests extends ESTestCase {
int numConnections = profile.getNumConnections(); int numConnections = profile.getNumConnections();
ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections); ArrayList<TcpChannel> fakeChannels = new ArrayList<>(numConnections);
for (int i = 0; i < numConnections; ++i) { for (int i = 0; i < numConnections; ++i) {
fakeChannels.add(new FakeTcpChannel(false, messageCaptor)); fakeChannels.add(new FakeTcpChannel(false, requestCaptor));
} }
listener.onResponse(new NodeChannels(node, fakeChannels, profile, Version.CURRENT)); listener.onResponse(new NodeChannels(node, fakeChannels, profile, Version.CURRENT));
return () -> CloseableChannel.closeChannels(fakeChannels, false); return () -> CloseableChannel.closeChannels(fakeChannels, false);
@ -233,11 +237,20 @@ public class TcpTransportTests extends ESTestCase {
transport.openConnection(node, profileBuilder.build(), future); transport.openConnection(node, profileBuilder.build(), future);
Transport.Connection connection = future.actionGet(); Transport.Connection connection = future.actionGet();
connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY); connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY);
transport.registerRequestHandler(new RequestHandlerRegistry<>("foobar", Req::new, mock(TaskManager.class),
(request1, channel, task) -> channel.sendResponse(TransportResponse.Empty.INSTANCE), ThreadPool.Names.SAME,
true, true));
BytesReference reference = messageCaptor.get(); BytesReference reference = requestCaptor.get();
assertNotNull(reference); assertNotNull(reference);
StreamInput streamIn = reference.streamInput(); AtomicReference<BytesReference> responseCaptor = new AtomicReference<>();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
FakeTcpChannel responseChannel = new FakeTcpChannel(true, address, address, responseCaptor);
transport.messageReceived(reference.slice(6, reference.length() - 6), responseChannel);
StreamInput streamIn = responseCaptor.get().streamInput();
streamIn.skip(TcpHeader.MARKER_BYTES_SIZE); streamIn.skip(TcpHeader.MARKER_BYTES_SIZE);
@SuppressWarnings("unused") @SuppressWarnings("unused")
int len = streamIn.readInt(); int len = streamIn.readInt();
@ -247,17 +260,14 @@ public class TcpTransportTests extends ESTestCase {
Version version = Version.fromId(streamIn.readInt()); Version version = Version.fromId(streamIn.readInt());
assertEquals(Version.CURRENT, version); assertEquals(Version.CURRENT, version);
assertEquals(compressed, TransportStatus.isCompress(status)); assertEquals(compressed, TransportStatus.isCompress(status));
assertFalse(TransportStatus.isRequest(status));
if (compressed) { if (compressed) {
final int bytesConsumed = TcpHeader.HEADER_SIZE; final int bytesConsumed = TcpHeader.HEADER_SIZE;
streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed)) streamIn = CompressorFactory.compressor(reference.slice(bytesConsumed, reference.length() - bytesConsumed))
.streamInput(streamIn); .streamInput(streamIn);
} }
threadPool.getThreadContext().readHeaders(streamIn); threadPool.getThreadContext().readHeaders(streamIn);
assertThat(streamIn.readStringArray(), equalTo(new String[0])); // features TransportResponse.Empty.INSTANCE.readFrom(streamIn);
assertEquals("foobar", streamIn.readString());
Req readReq = new Req("");
readReq.readFrom(streamIn);
assertEquals(request.value, readReq.value);
} finally { } finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
@ -297,6 +307,10 @@ public class TcpTransportTests extends ESTestCase {
this.value = value; this.value = value;
} }
private Req(StreamInput in) throws IOException {
value = in.readString();
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
value = in.readString(); value = in.readString();

View File

@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicReference;
public class FakeTcpChannel implements TcpChannel { public class FakeTcpChannel implements TcpChannel {
private final boolean isServer; private final boolean isServer;
private final InetSocketAddress localAddress;
private final InetSocketAddress remoteAddress;
private final String profile; private final String profile;
private final AtomicReference<BytesReference> messageCaptor; private final AtomicReference<BytesReference> messageCaptor;
private final ChannelStats stats = new ChannelStats(); private final ChannelStats stats = new ChannelStats();
@ -45,9 +47,21 @@ public class FakeTcpChannel implements TcpChannel {
this(isServer, "profile", messageCaptor); this(isServer, "profile", messageCaptor);
} }
public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress,
AtomicReference<BytesReference> messageCaptor) {
this(isServer, localAddress, remoteAddress,"profile", messageCaptor);
}
public FakeTcpChannel(boolean isServer, String profile, AtomicReference<BytesReference> messageCaptor) { public FakeTcpChannel(boolean isServer, String profile, AtomicReference<BytesReference> messageCaptor) {
this(isServer, null, null, profile, messageCaptor);
}
public FakeTcpChannel(boolean isServer, InetSocketAddress localAddress, InetSocketAddress remoteAddress, String profile,
AtomicReference<BytesReference> messageCaptor) {
this.isServer = isServer; this.isServer = isServer;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
this.profile = profile; this.profile = profile;
this.messageCaptor = messageCaptor; this.messageCaptor = messageCaptor;
} }
@ -64,12 +78,12 @@ public class FakeTcpChannel implements TcpChannel {
@Override @Override
public InetSocketAddress getLocalAddress() { public InetSocketAddress getLocalAddress() {
return null; return localAddress;
} }
@Override @Override
public InetSocketAddress getRemoteAddress() { public InetSocketAddress getRemoteAddress() {
return null; return remoteAddress;
} }
@Override @Override