From 9b9e17abf7ab34707e8a9264cbddc3afd196b7a3 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 28 Jun 2016 17:51:33 +0200 Subject: [PATCH] Cleanup Compressor interface (#19125) Today we have several deprecated methods, leaking netty interfaces, support for multiple compressors on the compressor interface. The netty interface can simply be replaced by BytesReference which we already have an implementation for, all the others are not used and are removed in this commit. --- .../common/compress/CompressedIndexInput.java | 215 ------------------ .../compress/CompressedStreamInput.java | 174 -------------- .../common/compress/CompressedXContent.java | 4 +- .../common/compress/Compressor.java | 14 -- .../common/compress/CompressorFactory.java | 58 +---- .../{deflate => }/DeflateCompressor.java | 42 +--- .../publish/PublishClusterStateAction.java | 4 +- .../blobstore/ChecksumBlobStoreFormat.java | 2 +- .../netty/MessageChannelHandler.java | 3 +- .../transport/netty/NettyTransport.java | 2 +- .../netty/NettyTransportChannel.java | 2 +- ...estCase.java => DeflateCompressTests.java} | 8 +- ...va => DeflateCompressedXContentTests.java} | 52 ++--- .../deflate/DeflateCompressedStreamTests.java | 30 --- .../deflate/DeflateXContentTests.java | 30 --- .../mapper/binary/BinaryMappingTests.java | 2 +- .../snapshots/BlobStoreFormatIT.java | 2 +- 17 files changed, 40 insertions(+), 604 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java delete mode 100644 core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java rename core/src/main/java/org/elasticsearch/common/compress/{deflate => }/DeflateCompressor.java (80%) rename core/src/test/java/org/elasticsearch/common/compress/{AbstractCompressedStreamTestCase.java => DeflateCompressTests.java} (98%) rename core/src/test/java/org/elasticsearch/common/compress/{AbstractCompressedXContentTestCase.java => DeflateCompressedXContentTests.java} (63%) delete mode 100644 core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateCompressedStreamTests.java delete mode 100644 core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateXContentTests.java diff --git a/core/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java b/core/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java deleted file mode 100644 index 599eaeae337..00000000000 --- a/core/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.compress; - -import org.apache.lucene.store.IndexInput; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.LongArray; - -import java.io.EOFException; -import java.io.IOException; - -/** - * @deprecated Used only for backward comp. to read old compressed files, since we now use codec based compression - */ -@Deprecated -public abstract class CompressedIndexInput extends IndexInput { - - private IndexInput in; - - private int version; - private long totalUncompressedLength; - private LongArray offsets; - - private boolean closed; - - protected byte[] uncompressed; - protected int uncompressedLength; - private int position = 0; - private int valid = 0; - private int currentOffsetIdx; - private long currentUncompressedChunkPointer; - - public CompressedIndexInput(IndexInput in) throws IOException { - super("compressed(" + in.toString() + ")"); - this.in = in; - readHeader(in); - this.version = in.readInt(); - long metaDataPosition = in.readLong(); - long headerLength = in.getFilePointer(); - in.seek(metaDataPosition); - this.totalUncompressedLength = in.readVLong(); - int size = in.readVInt(); - offsets = BigArrays.NON_RECYCLING_INSTANCE.newLongArray(size); - for (int i = 0; i < size; i++) { - offsets.set(i, in.readVLong()); - } - this.currentOffsetIdx = -1; - this.currentUncompressedChunkPointer = 0; - in.seek(headerLength); - } - - /** - * Method is overridden to report number of bytes that can now be read - * from decoded data buffer, without reading bytes from the underlying - * stream. - * Never throws an exception; returns number of bytes available without - * further reads from underlying source; -1 if stream has been closed, or - * 0 if an actual read (and possible blocking) is needed to find out. - */ - public int available() throws IOException { - // if closed, return -1; - if (closed) { - return -1; - } - int left = (valid - position); - return (left <= 0) ? 0 : left; - } - - @Override - public byte readByte() throws IOException { - if (!readyBuffer()) { - throw new EOFException(); - } - return uncompressed[position++]; - } - - public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException { - if (length < 1) { - return 0; - } - if (!readyBuffer()) { - return -1; - } - // First let's read however much data we happen to have... - int chunkLength = Math.min(valid - position, length); - System.arraycopy(uncompressed, position, buffer, offset, chunkLength); - position += chunkLength; - - if (chunkLength == length || !fullRead) { - return chunkLength; - } - // Need more data, then - int totalRead = chunkLength; - do { - offset += chunkLength; - if (!readyBuffer()) { - break; - } - chunkLength = Math.min(valid - position, (length - totalRead)); - System.arraycopy(uncompressed, position, buffer, offset, chunkLength); - position += chunkLength; - totalRead += chunkLength; - } while (totalRead < length); - - return totalRead; - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - int result = read(b, offset, len, true /* we want to have full reads, that's the contract... */); - if (result < len) { - throw new EOFException(); - } - } - - @Override - public long getFilePointer() { - return currentUncompressedChunkPointer + position; - } - - @Override - public void seek(long pos) throws IOException { - int idx = (int) (pos / uncompressedLength); - if (idx >= offsets.size()) { - // set the next "readyBuffer" to EOF - currentOffsetIdx = idx; - position = 0; - valid = 0; - return; - } - - // TODO: optimize so we won't have to readyBuffer on seek, can keep the position around, and set it on readyBuffer in this case - if (idx != currentOffsetIdx) { - long pointer = offsets.get(idx); - in.seek(pointer); - position = 0; - valid = 0; - currentOffsetIdx = idx - 1; // we are going to increase it in readyBuffer... - readyBuffer(); - } - position = (int) (pos % uncompressedLength); - } - - @Override - public long length() { - return totalUncompressedLength; - } - - @Override - public void close() throws IOException { - position = valid = 0; - if (!closed) { - closed = true; - doClose(); - in.close(); - } - } - - protected abstract void doClose() throws IOException; - - protected boolean readyBuffer() throws IOException { - if (position < valid) { - return true; - } - if (closed) { - return false; - } - // we reached the end... - if (currentOffsetIdx + 1 >= offsets.size()) { - return false; - } - valid = uncompress(in, uncompressed); - if (valid < 0) { - return false; - } - currentOffsetIdx++; - currentUncompressedChunkPointer = ((long) currentOffsetIdx) * uncompressedLength; - position = 0; - return (position < valid); - } - - protected abstract void readHeader(IndexInput in) throws IOException; - - /** - * Uncompress the data into the out array, returning the size uncompressed - */ - protected abstract int uncompress(IndexInput in, byte[] out) throws IOException; - - @Override - public IndexInput clone() { - // we clone and we need to make sure we keep the same positions! - CompressedIndexInput cloned = (CompressedIndexInput) super.clone(); - cloned.uncompressed = new byte[uncompressedLength]; - System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressedLength); - cloned.in = (IndexInput) cloned.in.clone(); - return cloned; - } -} diff --git a/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java b/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java deleted file mode 100644 index bdacbd4727b..00000000000 --- a/core/src/main/java/org/elasticsearch/common/compress/CompressedStreamInput.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.compress; - -import org.elasticsearch.Version; -import org.elasticsearch.common.io.stream.StreamInput; - -import java.io.EOFException; -import java.io.IOException; - -/** - */ -public abstract class CompressedStreamInput extends StreamInput { - - private final StreamInput in; - - private boolean closed; - - protected byte[] uncompressed; - private int position = 0; - private int valid = 0; - - public CompressedStreamInput(StreamInput in) throws IOException { - this.in = in; - super.setVersion(in.getVersion()); - readHeader(in); - } - - @Override - public void setVersion(Version version) { - in.setVersion(version); - super.setVersion(version); - } - - /** - * Method is overridden to report number of bytes that can now be read - * from decoded data buffer, without reading bytes from the underlying - * stream. - * Never throws an exception; returns number of bytes available without - * further reads from underlying source; -1 if stream has been closed, or - * 0 if an actual read (and possible blocking) is needed to find out. - */ - @Override - public int available() throws IOException { - // if closed, return -1; - if (closed) { - return -1; - } - int left = (valid - position); - return (left <= 0) ? 0 : left; - } - - @Override - public int read() throws IOException { - if (!readyBuffer()) { - return -1; - } - return uncompressed[position++] & 255; - } - - @Override - public byte readByte() throws IOException { - if (!readyBuffer()) { - throw new EOFException(); - } - return uncompressed[position++]; - } - - @Override - public int read(byte[] buffer, int offset, int length) throws IOException { - return read(buffer, offset, length, false); - } - - public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException { - if (length < 1) { - return 0; - } - if (!readyBuffer()) { - return -1; - } - // First let's read however much data we happen to have... - int chunkLength = Math.min(valid - position, length); - System.arraycopy(uncompressed, position, buffer, offset, chunkLength); - position += chunkLength; - - if (chunkLength == length || !fullRead) { - return chunkLength; - } - // Need more data, then - int totalRead = chunkLength; - do { - offset += chunkLength; - if (!readyBuffer()) { - break; - } - chunkLength = Math.min(valid - position, (length - totalRead)); - System.arraycopy(uncompressed, position, buffer, offset, chunkLength); - position += chunkLength; - totalRead += chunkLength; - } while (totalRead < length); - - return totalRead; - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - int result = read(b, offset, len, true /* we want to have full reads, that's the contract... */); - if (result < len) { - throw new EOFException(); - } - } - - @Override - public void reset() throws IOException { - this.position = 0; - this.valid = 0; - in.reset(); - } - - @Override - public void close() throws IOException { - position = valid = 0; - if (!closed) { - closed = true; - doClose(); - in.close(); - } - } - - protected abstract void doClose() throws IOException; - - /** - * Fill the uncompressed bytes buffer by reading the underlying inputStream. - */ - protected boolean readyBuffer() throws IOException { - if (position < valid) { - return true; - } - if (closed) { - return false; - } - valid = uncompress(in, uncompressed); - if (valid < 0) { - return false; - } - position = 0; - return (position < valid); - } - - protected abstract void readHeader(StreamInput in) throws IOException; - - /** - * Uncompress the data into the out array, returning the size uncompressed - */ - protected abstract int uncompress(StreamInput in, byte[] out) throws IOException; - -} diff --git a/core/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java b/core/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java index 462b91aeef0..3864befcc04 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java +++ b/core/src/main/java/org/elasticsearch/common/compress/CompressedXContent.java @@ -80,7 +80,7 @@ public final class CompressedXContent { */ public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - OutputStream compressedStream = CompressorFactory.defaultCompressor().streamOutput(bStream); + OutputStream compressedStream = CompressorFactory.COMPRESSOR.streamOutput(bStream); CRC32 crc32 = new CRC32(); OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32); try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) { @@ -105,7 +105,7 @@ public final class CompressedXContent { this.crc32 = crc32(new BytesArray(uncompressed())); } else { BytesStreamOutput out = new BytesStreamOutput(); - try (OutputStream compressedOutput = CompressorFactory.defaultCompressor().streamOutput(out)) { + try (OutputStream compressedOutput = CompressorFactory.COMPRESSOR.streamOutput(out)) { data.writeTo(compressedOutput); } this.bytes = out.bytes().toBytes(); diff --git a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java index 252fad09807..62ed1cb6edb 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/Compressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -33,21 +33,7 @@ public interface Compressor { boolean isCompressed(BytesReference bytes); - boolean isCompressed(ChannelBuffer buffer); - StreamInput streamInput(StreamInput in) throws IOException; StreamOutput streamOutput(StreamOutput out) throws IOException; - - /** - * @deprecated Used for backward comp. since we now use Lucene compressed codec. - */ - @Deprecated - boolean isCompressed(IndexInput in) throws IOException; - - /** - * @deprecated Used for backward comp. since we now use Lucene compressed codec. - */ - @Deprecated - CompressedIndexInput indexInput(IndexInput in) throws IOException; } diff --git a/core/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java b/core/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java index e6c43a524ca..82e049704cc 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java +++ b/core/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java @@ -19,16 +19,13 @@ package org.elasticsearch.common.compress; -import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.deflate.DeflateCompressor; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.jboss.netty.buffer.ChannelBuffer; import java.io.IOException; @@ -36,47 +33,21 @@ import java.io.IOException; */ public class CompressorFactory { - private static final Compressor[] compressors; - private static volatile Compressor defaultCompressor; - - static { - compressors = new Compressor[] { - new DeflateCompressor() - }; - defaultCompressor = new DeflateCompressor(); - } - - public static void setDefaultCompressor(Compressor defaultCompressor) { - CompressorFactory.defaultCompressor = defaultCompressor; - } - - public static Compressor defaultCompressor() { - return defaultCompressor; - } + public static final Compressor COMPRESSOR = new DeflateCompressor(); public static boolean isCompressed(BytesReference bytes) { return compressor(bytes) != null; } - /** - * @deprecated we don't compress lucene indexes anymore and rely on lucene codecs - */ - @Deprecated - public static boolean isCompressed(IndexInput in) throws IOException { - return compressor(in) != null; - } - @Nullable public static Compressor compressor(BytesReference bytes) { - for (Compressor compressor : compressors) { - if (compressor.isCompressed(bytes)) { + if (COMPRESSOR.isCompressed(bytes)) { // bytes should be either detected as compressed or as xcontent, // if we have bytes that can be either detected as compressed or // as a xcontent, we have a problem assert XContentFactory.xContentType(bytes) == null; - return compressor; + return COMPRESSOR; } - } XContentType contentType = XContentFactory.xContentType(bytes); if (contentType == null) { @@ -97,29 +68,6 @@ public class CompressorFactory { (bytes.get(2) == 0 || bytes.get(2) == 1); } - public static Compressor compressor(ChannelBuffer buffer) { - for (Compressor compressor : compressors) { - if (compressor.isCompressed(buffer)) { - return compressor; - } - } - throw new NotCompressedException(); - } - - /** - * @deprecated we don't compress lucene indexes anymore and rely on lucene codecs - */ - @Deprecated - @Nullable - public static Compressor compressor(IndexInput in) throws IOException { - for (Compressor compressor : compressors) { - if (compressor.isCompressed(in)) { - return compressor; - } - } - return null; - } - /** * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}. */ diff --git a/core/src/main/java/org/elasticsearch/common/compress/deflate/DeflateCompressor.java b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java similarity index 80% rename from core/src/main/java/org/elasticsearch/common/compress/deflate/DeflateCompressor.java rename to core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java index be396324788..42e2efa358c 100644 --- a/core/src/main/java/org/elasticsearch/common/compress/deflate/DeflateCompressor.java +++ b/core/src/main/java/org/elasticsearch/common/compress/DeflateCompressor.java @@ -17,17 +17,14 @@ * under the License. */ -package org.elasticsearch.common.compress.deflate; +package org.elasticsearch.common.compress; -import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.compress.CompressedIndexInput; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.jboss.netty.buffer.ChannelBuffer; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -35,6 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; @@ -69,20 +67,6 @@ public class DeflateCompressor implements Compressor { return true; } - @Override - public boolean isCompressed(ChannelBuffer buffer) { - if (buffer.readableBytes() < HEADER.length) { - return false; - } - final int offset = buffer.readerIndex(); - for (int i = 0; i < HEADER.length; ++i) { - if (buffer.getByte(offset + i) != HEADER[i]) { - return false; - } - } - return true; - } - @Override public StreamInput streamInput(StreamInput in) throws IOException { final byte[] headerBytes = new byte[HEADER.length]; @@ -103,16 +87,14 @@ public class DeflateCompressor implements Compressor { InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE); decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE); return new InputStreamStreamInput(decompressedIn) { - private boolean closed = false; - + final AtomicBoolean closed = new AtomicBoolean(false); public void close() throws IOException { try { super.close(); } finally { - if (closed == false) { + if (closed.compareAndSet(false, true)) { // important to release native memory inflater.end(); - closed = true; } } } @@ -128,29 +110,17 @@ public class DeflateCompressor implements Compressor { OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush); compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE); return new OutputStreamStreamOutput(compressedOut) { - private boolean closed = false; - + final AtomicBoolean closed = new AtomicBoolean(false); public void close() throws IOException { try { super.close(); } finally { - if (closed == false) { + if (closed.compareAndSet(false, true)) { // important to release native memory deflater.end(); - closed = true; } } } }; } - - @Override - public boolean isCompressed(IndexInput in) throws IOException { - return false; - } - - @Override - public CompressedIndexInput indexInput(IndexInput in) throws IOException { - throw new UnsupportedOperationException(); - } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index c2333ff177d..48be9f64d5c 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -335,7 +335,7 @@ public class PublishClusterStateAction extends AbstractComponent { public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) { + try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) { stream.setVersion(nodeVersion); stream.writeBoolean(true); clusterState.writeTo(stream); @@ -345,7 +345,7 @@ public class PublishClusterStateAction extends AbstractComponent { public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException { BytesStreamOutput bStream = new BytesStreamOutput(); - try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) { + try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) { stream.setVersion(nodeVersion); stream.writeBoolean(false); diff.writeTo(stream); diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 6cea34cf679..37df2ddfb90 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -195,7 +195,7 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm protected BytesReference write(T obj) throws IOException { try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { if (compress) { - try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) { + try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) { write(obj, compressedStreamOutput); } } else { diff --git a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index e45635e3349..55793384b1b 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty; import org.apache.lucene.util.IOUtils; import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.ChannelBufferBytesReference; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.CompressorFactory; @@ -110,7 +111,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) { Compressor compressor; try { - compressor = CompressorFactory.compressor(buffer); + compressor = CompressorFactory.compressor(new ChannelBufferBytesReference(buffer)); } catch (NotCompressedException ex) { int maxToRead = Math.min(buffer.readableBytes(), 10); int offset = buffer.readerIndex(); diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index c9f02066836..53eb63c86c5 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -886,7 +886,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem // the header part is compressed, and the "body" can't be extracted as compressed if (options.compress() && (!(request instanceof BytesTransportRequest))) { status = TransportStatus.setCompress(status); - stream = CompressorFactory.defaultCompressor().streamOutput(stream); + stream = CompressorFactory.COMPRESSOR.streamOutput(stream); } // we pick the smallest of the 2, to support both backward and forward compatibility diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index 03856017c36..65ea00d75e5 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -99,7 +99,7 @@ public class NettyTransportChannel implements TransportChannel { StreamOutput stream = bStream; if (options.compress()) { status = TransportStatus.setCompress(status); - stream = CompressorFactory.defaultCompressor().streamOutput(stream); + stream = CompressorFactory.COMPRESSOR.streamOutput(stream); } stream.setVersion(version); response.writeTo(stream); diff --git a/core/src/test/java/org/elasticsearch/common/compress/AbstractCompressedStreamTestCase.java b/core/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java similarity index 98% rename from core/src/test/java/org/elasticsearch/common/compress/AbstractCompressedStreamTestCase.java rename to core/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java index 0e94f6eaf80..33d11aa23d8 100644 --- a/core/src/test/java/org/elasticsearch/common/compress/AbstractCompressedStreamTestCase.java +++ b/core/src/test/java/org/elasticsearch/common/compress/DeflateCompressTests.java @@ -37,13 +37,9 @@ import java.util.concurrent.CountDownLatch; /** * Test streaming compression (e.g. used for recovery) */ -public abstract class AbstractCompressedStreamTestCase extends ESTestCase { +public class DeflateCompressTests extends ESTestCase { - private final Compressor compressor; - - protected AbstractCompressedStreamTestCase(Compressor compressor) { - this.compressor = compressor; - } + private final Compressor compressor = new DeflateCompressor(); public void testRandom() throws IOException { Random r = random(); diff --git a/core/src/test/java/org/elasticsearch/common/compress/AbstractCompressedXContentTestCase.java b/core/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java similarity index 63% rename from core/src/test/java/org/elasticsearch/common/compress/AbstractCompressedXContentTestCase.java rename to core/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java index d1c862f8a69..72866d082ae 100644 --- a/core/src/test/java/org/elasticsearch/common/compress/AbstractCompressedXContentTestCase.java +++ b/core/src/test/java/org/elasticsearch/common/compress/DeflateCompressedXContentTests.java @@ -35,13 +35,9 @@ import static org.hamcrest.Matchers.not; /** * */ -public abstract class AbstractCompressedXContentTestCase extends ESTestCase { +public class DeflateCompressedXContentTests extends ESTestCase { - private final Compressor compressor; - - protected AbstractCompressedXContentTestCase(Compressor compressor) { - this.compressor = compressor; - } + private final Compressor compressor = new DeflateCompressor(); private void assertEquals(CompressedXContent s1, CompressedXContent s2) { Assert.assertEquals(s1, s2); @@ -50,38 +46,26 @@ public abstract class AbstractCompressedXContentTestCase extends ESTestCase { } public void simpleTests() throws IOException { - Compressor defaultCompressor = CompressorFactory.defaultCompressor(); - try { - CompressorFactory.setDefaultCompressor(compressor); - String str = "---\nf:this is a simple string"; - CompressedXContent cstr = new CompressedXContent(str); - assertThat(cstr.string(), equalTo(str)); - assertThat(new CompressedXContent(str), equalTo(cstr)); + String str = "---\nf:this is a simple string"; + CompressedXContent cstr = new CompressedXContent(str); + assertThat(cstr.string(), equalTo(str)); + assertThat(new CompressedXContent(str), equalTo(cstr)); - String str2 = "---\nf:this is a simple string 2"; - CompressedXContent cstr2 = new CompressedXContent(str2); - assertThat(cstr2.string(), not(equalTo(str))); - assertThat(new CompressedXContent(str2), not(equalTo(cstr))); - assertEquals(new CompressedXContent(str2), cstr2); - } finally { - CompressorFactory.setDefaultCompressor(defaultCompressor); - } + String str2 = "---\nf:this is a simple string 2"; + CompressedXContent cstr2 = new CompressedXContent(str2); + assertThat(cstr2.string(), not(equalTo(str))); + assertThat(new CompressedXContent(str2), not(equalTo(cstr))); + assertEquals(new CompressedXContent(str2), cstr2); } public void testRandom() throws IOException { - Compressor defaultCompressor = CompressorFactory.defaultCompressor(); - try { - CompressorFactory.setDefaultCompressor(compressor); - Random r = random(); - for (int i = 0; i < 1000; i++) { - String string = TestUtil.randomUnicodeString(r, 10000); - // hack to make it detected as YAML - string = "---\n" + string; - CompressedXContent compressedXContent = new CompressedXContent(string); - assertThat(compressedXContent.string(), equalTo(string)); - } - } finally { - CompressorFactory.setDefaultCompressor(defaultCompressor); + Random r = random(); + for (int i = 0; i < 1000; i++) { + String string = TestUtil.randomUnicodeString(r, 10000); + // hack to make it detected as YAML + string = "---\n" + string; + CompressedXContent compressedXContent = new CompressedXContent(string); + assertThat(compressedXContent.string(), equalTo(string)); } } diff --git a/core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateCompressedStreamTests.java b/core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateCompressedStreamTests.java deleted file mode 100644 index a6d33585dbc..00000000000 --- a/core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateCompressedStreamTests.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.compress.deflate; - -import org.elasticsearch.common.compress.AbstractCompressedStreamTestCase; - -public class DeflateCompressedStreamTests extends AbstractCompressedStreamTestCase { - - public DeflateCompressedStreamTests() { - super(new DeflateCompressor()); - } - -} diff --git a/core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateXContentTests.java b/core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateXContentTests.java deleted file mode 100644 index 359a582e169..00000000000 --- a/core/src/test/java/org/elasticsearch/common/compress/deflate/DeflateXContentTests.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.common.compress.deflate; - -import org.elasticsearch.common.compress.AbstractCompressedXContentTestCase; - -public class DeflateXContentTests extends AbstractCompressedXContentTestCase { - - public DeflateXContentTests() { - super(new DeflateCompressor()); - } - -} diff --git a/core/src/test/java/org/elasticsearch/index/mapper/binary/BinaryMappingTests.java b/core/src/test/java/org/elasticsearch/index/mapper/binary/BinaryMappingTests.java index 7be0cc8031b..fc8e2ba1872 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/binary/BinaryMappingTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/binary/BinaryMappingTests.java @@ -76,7 +76,7 @@ public class BinaryMappingTests extends ESSingleNodeTestCase { // case 2: a value that looks compressed: this used to fail in 1.x BytesStreamOutput out = new BytesStreamOutput(); - try (StreamOutput compressed = CompressorFactory.defaultCompressor().streamOutput(out)) { + try (StreamOutput compressed = CompressorFactory.COMPRESSOR.streamOutput(out)) { new BytesArray(binaryValue1).writeTo(compressed); } final byte[] binaryValue2 = out.bytes().toBytes(); diff --git a/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java index b2b9e780205..e1589b4cd2f 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java @@ -138,7 +138,7 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase { private BytesReference write(T obj) throws IOException { try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) { if (compress) { - try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) { + try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) { write(obj, compressedStreamOutput); } } else {