Cleanup Compressor interface (#19125)
Today we have several deprecated methods, leaking netty interfaces, support for multiple compressors on the compressor interface. The netty interface can simply be replaced by BytesReference which we already have an implementation for, all the others are not used and are removed in this commit.
This commit is contained in:
parent
3cc2251e33
commit
9b9e17abf7
|
@ -1,215 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.compress;
|
||||
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @deprecated Used only for backward comp. to read old compressed files, since we now use codec based compression
|
||||
*/
|
||||
@Deprecated
|
||||
public abstract class CompressedIndexInput extends IndexInput {
|
||||
|
||||
private IndexInput in;
|
||||
|
||||
private int version;
|
||||
private long totalUncompressedLength;
|
||||
private LongArray offsets;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
protected byte[] uncompressed;
|
||||
protected int uncompressedLength;
|
||||
private int position = 0;
|
||||
private int valid = 0;
|
||||
private int currentOffsetIdx;
|
||||
private long currentUncompressedChunkPointer;
|
||||
|
||||
public CompressedIndexInput(IndexInput in) throws IOException {
|
||||
super("compressed(" + in.toString() + ")");
|
||||
this.in = in;
|
||||
readHeader(in);
|
||||
this.version = in.readInt();
|
||||
long metaDataPosition = in.readLong();
|
||||
long headerLength = in.getFilePointer();
|
||||
in.seek(metaDataPosition);
|
||||
this.totalUncompressedLength = in.readVLong();
|
||||
int size = in.readVInt();
|
||||
offsets = BigArrays.NON_RECYCLING_INSTANCE.newLongArray(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
offsets.set(i, in.readVLong());
|
||||
}
|
||||
this.currentOffsetIdx = -1;
|
||||
this.currentUncompressedChunkPointer = 0;
|
||||
in.seek(headerLength);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method is overridden to report number of bytes that can now be read
|
||||
* from decoded data buffer, without reading bytes from the underlying
|
||||
* stream.
|
||||
* Never throws an exception; returns number of bytes available without
|
||||
* further reads from underlying source; -1 if stream has been closed, or
|
||||
* 0 if an actual read (and possible blocking) is needed to find out.
|
||||
*/
|
||||
public int available() throws IOException {
|
||||
// if closed, return -1;
|
||||
if (closed) {
|
||||
return -1;
|
||||
}
|
||||
int left = (valid - position);
|
||||
return (left <= 0) ? 0 : left;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
if (!readyBuffer()) {
|
||||
throw new EOFException();
|
||||
}
|
||||
return uncompressed[position++];
|
||||
}
|
||||
|
||||
public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException {
|
||||
if (length < 1) {
|
||||
return 0;
|
||||
}
|
||||
if (!readyBuffer()) {
|
||||
return -1;
|
||||
}
|
||||
// First let's read however much data we happen to have...
|
||||
int chunkLength = Math.min(valid - position, length);
|
||||
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
|
||||
position += chunkLength;
|
||||
|
||||
if (chunkLength == length || !fullRead) {
|
||||
return chunkLength;
|
||||
}
|
||||
// Need more data, then
|
||||
int totalRead = chunkLength;
|
||||
do {
|
||||
offset += chunkLength;
|
||||
if (!readyBuffer()) {
|
||||
break;
|
||||
}
|
||||
chunkLength = Math.min(valid - position, (length - totalRead));
|
||||
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
|
||||
position += chunkLength;
|
||||
totalRead += chunkLength;
|
||||
} while (totalRead < length);
|
||||
|
||||
return totalRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||
int result = read(b, offset, len, true /* we want to have full reads, that's the contract... */);
|
||||
if (result < len) {
|
||||
throw new EOFException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFilePointer() {
|
||||
return currentUncompressedChunkPointer + position;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long pos) throws IOException {
|
||||
int idx = (int) (pos / uncompressedLength);
|
||||
if (idx >= offsets.size()) {
|
||||
// set the next "readyBuffer" to EOF
|
||||
currentOffsetIdx = idx;
|
||||
position = 0;
|
||||
valid = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: optimize so we won't have to readyBuffer on seek, can keep the position around, and set it on readyBuffer in this case
|
||||
if (idx != currentOffsetIdx) {
|
||||
long pointer = offsets.get(idx);
|
||||
in.seek(pointer);
|
||||
position = 0;
|
||||
valid = 0;
|
||||
currentOffsetIdx = idx - 1; // we are going to increase it in readyBuffer...
|
||||
readyBuffer();
|
||||
}
|
||||
position = (int) (pos % uncompressedLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long length() {
|
||||
return totalUncompressedLength;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
position = valid = 0;
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
doClose();
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doClose() throws IOException;
|
||||
|
||||
protected boolean readyBuffer() throws IOException {
|
||||
if (position < valid) {
|
||||
return true;
|
||||
}
|
||||
if (closed) {
|
||||
return false;
|
||||
}
|
||||
// we reached the end...
|
||||
if (currentOffsetIdx + 1 >= offsets.size()) {
|
||||
return false;
|
||||
}
|
||||
valid = uncompress(in, uncompressed);
|
||||
if (valid < 0) {
|
||||
return false;
|
||||
}
|
||||
currentOffsetIdx++;
|
||||
currentUncompressedChunkPointer = ((long) currentOffsetIdx) * uncompressedLength;
|
||||
position = 0;
|
||||
return (position < valid);
|
||||
}
|
||||
|
||||
protected abstract void readHeader(IndexInput in) throws IOException;
|
||||
|
||||
/**
|
||||
* Uncompress the data into the out array, returning the size uncompressed
|
||||
*/
|
||||
protected abstract int uncompress(IndexInput in, byte[] out) throws IOException;
|
||||
|
||||
@Override
|
||||
public IndexInput clone() {
|
||||
// we clone and we need to make sure we keep the same positions!
|
||||
CompressedIndexInput cloned = (CompressedIndexInput) super.clone();
|
||||
cloned.uncompressed = new byte[uncompressedLength];
|
||||
System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressedLength);
|
||||
cloned.in = (IndexInput) cloned.in.clone();
|
||||
return cloned;
|
||||
}
|
||||
}
|
|
@ -1,174 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.compress;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class CompressedStreamInput extends StreamInput {
|
||||
|
||||
private final StreamInput in;
|
||||
|
||||
private boolean closed;
|
||||
|
||||
protected byte[] uncompressed;
|
||||
private int position = 0;
|
||||
private int valid = 0;
|
||||
|
||||
public CompressedStreamInput(StreamInput in) throws IOException {
|
||||
this.in = in;
|
||||
super.setVersion(in.getVersion());
|
||||
readHeader(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setVersion(Version version) {
|
||||
in.setVersion(version);
|
||||
super.setVersion(version);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method is overridden to report number of bytes that can now be read
|
||||
* from decoded data buffer, without reading bytes from the underlying
|
||||
* stream.
|
||||
* Never throws an exception; returns number of bytes available without
|
||||
* further reads from underlying source; -1 if stream has been closed, or
|
||||
* 0 if an actual read (and possible blocking) is needed to find out.
|
||||
*/
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
// if closed, return -1;
|
||||
if (closed) {
|
||||
return -1;
|
||||
}
|
||||
int left = (valid - position);
|
||||
return (left <= 0) ? 0 : left;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (!readyBuffer()) {
|
||||
return -1;
|
||||
}
|
||||
return uncompressed[position++] & 255;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
if (!readyBuffer()) {
|
||||
throw new EOFException();
|
||||
}
|
||||
return uncompressed[position++];
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] buffer, int offset, int length) throws IOException {
|
||||
return read(buffer, offset, length, false);
|
||||
}
|
||||
|
||||
public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException {
|
||||
if (length < 1) {
|
||||
return 0;
|
||||
}
|
||||
if (!readyBuffer()) {
|
||||
return -1;
|
||||
}
|
||||
// First let's read however much data we happen to have...
|
||||
int chunkLength = Math.min(valid - position, length);
|
||||
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
|
||||
position += chunkLength;
|
||||
|
||||
if (chunkLength == length || !fullRead) {
|
||||
return chunkLength;
|
||||
}
|
||||
// Need more data, then
|
||||
int totalRead = chunkLength;
|
||||
do {
|
||||
offset += chunkLength;
|
||||
if (!readyBuffer()) {
|
||||
break;
|
||||
}
|
||||
chunkLength = Math.min(valid - position, (length - totalRead));
|
||||
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
|
||||
position += chunkLength;
|
||||
totalRead += chunkLength;
|
||||
} while (totalRead < length);
|
||||
|
||||
return totalRead;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||
int result = read(b, offset, len, true /* we want to have full reads, that's the contract... */);
|
||||
if (result < len) {
|
||||
throw new EOFException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() throws IOException {
|
||||
this.position = 0;
|
||||
this.valid = 0;
|
||||
in.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
position = valid = 0;
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
doClose();
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void doClose() throws IOException;
|
||||
|
||||
/**
|
||||
* Fill the uncompressed bytes buffer by reading the underlying inputStream.
|
||||
*/
|
||||
protected boolean readyBuffer() throws IOException {
|
||||
if (position < valid) {
|
||||
return true;
|
||||
}
|
||||
if (closed) {
|
||||
return false;
|
||||
}
|
||||
valid = uncompress(in, uncompressed);
|
||||
if (valid < 0) {
|
||||
return false;
|
||||
}
|
||||
position = 0;
|
||||
return (position < valid);
|
||||
}
|
||||
|
||||
protected abstract void readHeader(StreamInput in) throws IOException;
|
||||
|
||||
/**
|
||||
* Uncompress the data into the out array, returning the size uncompressed
|
||||
*/
|
||||
protected abstract int uncompress(StreamInput in, byte[] out) throws IOException;
|
||||
|
||||
}
|
|
@ -80,7 +80,7 @@ public final class CompressedXContent {
|
|||
*/
|
||||
public CompressedXContent(ToXContent xcontent, XContentType type, ToXContent.Params params) throws IOException {
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
OutputStream compressedStream = CompressorFactory.defaultCompressor().streamOutput(bStream);
|
||||
OutputStream compressedStream = CompressorFactory.COMPRESSOR.streamOutput(bStream);
|
||||
CRC32 crc32 = new CRC32();
|
||||
OutputStream checkedStream = new CheckedOutputStream(compressedStream, crc32);
|
||||
try (XContentBuilder builder = XContentFactory.contentBuilder(type, checkedStream)) {
|
||||
|
@ -105,7 +105,7 @@ public final class CompressedXContent {
|
|||
this.crc32 = crc32(new BytesArray(uncompressed()));
|
||||
} else {
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
try (OutputStream compressedOutput = CompressorFactory.defaultCompressor().streamOutput(out)) {
|
||||
try (OutputStream compressedOutput = CompressorFactory.COMPRESSOR.streamOutput(out)) {
|
||||
data.writeTo(compressedOutput);
|
||||
}
|
||||
this.bytes = out.bytes().toBytes();
|
||||
|
|
|
@ -33,21 +33,7 @@ public interface Compressor {
|
|||
|
||||
boolean isCompressed(BytesReference bytes);
|
||||
|
||||
boolean isCompressed(ChannelBuffer buffer);
|
||||
|
||||
StreamInput streamInput(StreamInput in) throws IOException;
|
||||
|
||||
StreamOutput streamOutput(StreamOutput out) throws IOException;
|
||||
|
||||
/**
|
||||
* @deprecated Used for backward comp. since we now use Lucene compressed codec.
|
||||
*/
|
||||
@Deprecated
|
||||
boolean isCompressed(IndexInput in) throws IOException;
|
||||
|
||||
/**
|
||||
* @deprecated Used for backward comp. since we now use Lucene compressed codec.
|
||||
*/
|
||||
@Deprecated
|
||||
CompressedIndexInput indexInput(IndexInput in) throws IOException;
|
||||
}
|
||||
|
|
|
@ -19,16 +19,13 @@
|
|||
|
||||
package org.elasticsearch.common.compress;
|
||||
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.compress.deflate.DeflateCompressor;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -36,47 +33,21 @@ import java.io.IOException;
|
|||
*/
|
||||
public class CompressorFactory {
|
||||
|
||||
private static final Compressor[] compressors;
|
||||
private static volatile Compressor defaultCompressor;
|
||||
|
||||
static {
|
||||
compressors = new Compressor[] {
|
||||
new DeflateCompressor()
|
||||
};
|
||||
defaultCompressor = new DeflateCompressor();
|
||||
}
|
||||
|
||||
public static void setDefaultCompressor(Compressor defaultCompressor) {
|
||||
CompressorFactory.defaultCompressor = defaultCompressor;
|
||||
}
|
||||
|
||||
public static Compressor defaultCompressor() {
|
||||
return defaultCompressor;
|
||||
}
|
||||
public static final Compressor COMPRESSOR = new DeflateCompressor();
|
||||
|
||||
public static boolean isCompressed(BytesReference bytes) {
|
||||
return compressor(bytes) != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated we don't compress lucene indexes anymore and rely on lucene codecs
|
||||
*/
|
||||
@Deprecated
|
||||
public static boolean isCompressed(IndexInput in) throws IOException {
|
||||
return compressor(in) != null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static Compressor compressor(BytesReference bytes) {
|
||||
for (Compressor compressor : compressors) {
|
||||
if (compressor.isCompressed(bytes)) {
|
||||
if (COMPRESSOR.isCompressed(bytes)) {
|
||||
// bytes should be either detected as compressed or as xcontent,
|
||||
// if we have bytes that can be either detected as compressed or
|
||||
// as a xcontent, we have a problem
|
||||
assert XContentFactory.xContentType(bytes) == null;
|
||||
return compressor;
|
||||
return COMPRESSOR;
|
||||
}
|
||||
}
|
||||
|
||||
XContentType contentType = XContentFactory.xContentType(bytes);
|
||||
if (contentType == null) {
|
||||
|
@ -97,29 +68,6 @@ public class CompressorFactory {
|
|||
(bytes.get(2) == 0 || bytes.get(2) == 1);
|
||||
}
|
||||
|
||||
public static Compressor compressor(ChannelBuffer buffer) {
|
||||
for (Compressor compressor : compressors) {
|
||||
if (compressor.isCompressed(buffer)) {
|
||||
return compressor;
|
||||
}
|
||||
}
|
||||
throw new NotCompressedException();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated we don't compress lucene indexes anymore and rely on lucene codecs
|
||||
*/
|
||||
@Deprecated
|
||||
@Nullable
|
||||
public static Compressor compressor(IndexInput in) throws IOException {
|
||||
for (Compressor compressor : compressors) {
|
||||
if (compressor.isCompressed(in)) {
|
||||
return compressor;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(BytesReference)}.
|
||||
*/
|
||||
|
|
|
@ -17,17 +17,14 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.compress.deflate;
|
||||
package org.elasticsearch.common.compress;
|
||||
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.compress.CompressedIndexInput;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
|
||||
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
|
@ -35,6 +32,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.zip.Deflater;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.Inflater;
|
||||
|
@ -69,20 +67,6 @@ public class DeflateCompressor implements Compressor {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCompressed(ChannelBuffer buffer) {
|
||||
if (buffer.readableBytes() < HEADER.length) {
|
||||
return false;
|
||||
}
|
||||
final int offset = buffer.readerIndex();
|
||||
for (int i = 0; i < HEADER.length; ++i) {
|
||||
if (buffer.getByte(offset + i) != HEADER[i]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StreamInput streamInput(StreamInput in) throws IOException {
|
||||
final byte[] headerBytes = new byte[HEADER.length];
|
||||
|
@ -103,16 +87,14 @@ public class DeflateCompressor implements Compressor {
|
|||
InputStream decompressedIn = new InflaterInputStream(in, inflater, BUFFER_SIZE);
|
||||
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
|
||||
return new InputStreamStreamInput(decompressedIn) {
|
||||
private boolean closed = false;
|
||||
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
if (closed == false) {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
// important to release native memory
|
||||
inflater.end();
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -128,29 +110,17 @@ public class DeflateCompressor implements Compressor {
|
|||
OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
|
||||
compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
|
||||
return new OutputStreamStreamOutput(compressedOut) {
|
||||
private boolean closed = false;
|
||||
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
public void close() throws IOException {
|
||||
try {
|
||||
super.close();
|
||||
} finally {
|
||||
if (closed == false) {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
// important to release native memory
|
||||
deflater.end();
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCompressed(IndexInput in) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompressedIndexInput indexInput(IndexInput in) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
|
@ -335,7 +335,7 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
|
||||
public static BytesReference serializeFullClusterState(ClusterState clusterState, Version nodeVersion) throws IOException {
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) {
|
||||
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
|
||||
stream.setVersion(nodeVersion);
|
||||
stream.writeBoolean(true);
|
||||
clusterState.writeTo(stream);
|
||||
|
@ -345,7 +345,7 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
|
||||
public static BytesReference serializeDiffClusterState(Diff diff, Version nodeVersion) throws IOException {
|
||||
BytesStreamOutput bStream = new BytesStreamOutput();
|
||||
try (StreamOutput stream = CompressorFactory.defaultCompressor().streamOutput(bStream)) {
|
||||
try (StreamOutput stream = CompressorFactory.COMPRESSOR.streamOutput(bStream)) {
|
||||
stream.setVersion(nodeVersion);
|
||||
stream.writeBoolean(false);
|
||||
diff.writeTo(stream);
|
||||
|
|
|
@ -195,7 +195,7 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
|
|||
protected BytesReference write(T obj) throws IOException {
|
||||
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
|
||||
if (compress) {
|
||||
try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) {
|
||||
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
|
||||
write(obj, compressedStreamOutput);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
|
@ -110,7 +111,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
|
||||
Compressor compressor;
|
||||
try {
|
||||
compressor = CompressorFactory.compressor(buffer);
|
||||
compressor = CompressorFactory.compressor(new ChannelBufferBytesReference(buffer));
|
||||
} catch (NotCompressedException ex) {
|
||||
int maxToRead = Math.min(buffer.readableBytes(), 10);
|
||||
int offset = buffer.readerIndex();
|
||||
|
|
|
@ -886,7 +886,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
// the header part is compressed, and the "body" can't be extracted as compressed
|
||||
if (options.compress() && (!(request instanceof BytesTransportRequest))) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
|
||||
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
|
||||
}
|
||||
|
||||
// we pick the smallest of the 2, to support both backward and forward compatibility
|
||||
|
|
|
@ -99,7 +99,7 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
status = TransportStatus.setCompress(status);
|
||||
stream = CompressorFactory.defaultCompressor().streamOutput(stream);
|
||||
stream = CompressorFactory.COMPRESSOR.streamOutput(stream);
|
||||
}
|
||||
stream.setVersion(version);
|
||||
response.writeTo(stream);
|
||||
|
|
|
@ -37,13 +37,9 @@ import java.util.concurrent.CountDownLatch;
|
|||
/**
|
||||
* Test streaming compression (e.g. used for recovery)
|
||||
*/
|
||||
public abstract class AbstractCompressedStreamTestCase extends ESTestCase {
|
||||
public class DeflateCompressTests extends ESTestCase {
|
||||
|
||||
private final Compressor compressor;
|
||||
|
||||
protected AbstractCompressedStreamTestCase(Compressor compressor) {
|
||||
this.compressor = compressor;
|
||||
}
|
||||
private final Compressor compressor = new DeflateCompressor();
|
||||
|
||||
public void testRandom() throws IOException {
|
||||
Random r = random();
|
|
@ -35,13 +35,9 @@ import static org.hamcrest.Matchers.not;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractCompressedXContentTestCase extends ESTestCase {
|
||||
public class DeflateCompressedXContentTests extends ESTestCase {
|
||||
|
||||
private final Compressor compressor;
|
||||
|
||||
protected AbstractCompressedXContentTestCase(Compressor compressor) {
|
||||
this.compressor = compressor;
|
||||
}
|
||||
private final Compressor compressor = new DeflateCompressor();
|
||||
|
||||
private void assertEquals(CompressedXContent s1, CompressedXContent s2) {
|
||||
Assert.assertEquals(s1, s2);
|
||||
|
@ -50,38 +46,26 @@ public abstract class AbstractCompressedXContentTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void simpleTests() throws IOException {
|
||||
Compressor defaultCompressor = CompressorFactory.defaultCompressor();
|
||||
try {
|
||||
CompressorFactory.setDefaultCompressor(compressor);
|
||||
String str = "---\nf:this is a simple string";
|
||||
CompressedXContent cstr = new CompressedXContent(str);
|
||||
assertThat(cstr.string(), equalTo(str));
|
||||
assertThat(new CompressedXContent(str), equalTo(cstr));
|
||||
String str = "---\nf:this is a simple string";
|
||||
CompressedXContent cstr = new CompressedXContent(str);
|
||||
assertThat(cstr.string(), equalTo(str));
|
||||
assertThat(new CompressedXContent(str), equalTo(cstr));
|
||||
|
||||
String str2 = "---\nf:this is a simple string 2";
|
||||
CompressedXContent cstr2 = new CompressedXContent(str2);
|
||||
assertThat(cstr2.string(), not(equalTo(str)));
|
||||
assertThat(new CompressedXContent(str2), not(equalTo(cstr)));
|
||||
assertEquals(new CompressedXContent(str2), cstr2);
|
||||
} finally {
|
||||
CompressorFactory.setDefaultCompressor(defaultCompressor);
|
||||
}
|
||||
String str2 = "---\nf:this is a simple string 2";
|
||||
CompressedXContent cstr2 = new CompressedXContent(str2);
|
||||
assertThat(cstr2.string(), not(equalTo(str)));
|
||||
assertThat(new CompressedXContent(str2), not(equalTo(cstr)));
|
||||
assertEquals(new CompressedXContent(str2), cstr2);
|
||||
}
|
||||
|
||||
public void testRandom() throws IOException {
|
||||
Compressor defaultCompressor = CompressorFactory.defaultCompressor();
|
||||
try {
|
||||
CompressorFactory.setDefaultCompressor(compressor);
|
||||
Random r = random();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
String string = TestUtil.randomUnicodeString(r, 10000);
|
||||
// hack to make it detected as YAML
|
||||
string = "---\n" + string;
|
||||
CompressedXContent compressedXContent = new CompressedXContent(string);
|
||||
assertThat(compressedXContent.string(), equalTo(string));
|
||||
}
|
||||
} finally {
|
||||
CompressorFactory.setDefaultCompressor(defaultCompressor);
|
||||
Random r = random();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
String string = TestUtil.randomUnicodeString(r, 10000);
|
||||
// hack to make it detected as YAML
|
||||
string = "---\n" + string;
|
||||
CompressedXContent compressedXContent = new CompressedXContent(string);
|
||||
assertThat(compressedXContent.string(), equalTo(string));
|
||||
}
|
||||
}
|
||||
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.compress.deflate;
|
||||
|
||||
import org.elasticsearch.common.compress.AbstractCompressedStreamTestCase;
|
||||
|
||||
public class DeflateCompressedStreamTests extends AbstractCompressedStreamTestCase {
|
||||
|
||||
public DeflateCompressedStreamTests() {
|
||||
super(new DeflateCompressor());
|
||||
}
|
||||
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.compress.deflate;
|
||||
|
||||
import org.elasticsearch.common.compress.AbstractCompressedXContentTestCase;
|
||||
|
||||
public class DeflateXContentTests extends AbstractCompressedXContentTestCase {
|
||||
|
||||
public DeflateXContentTests() {
|
||||
super(new DeflateCompressor());
|
||||
}
|
||||
|
||||
}
|
|
@ -76,7 +76,7 @@ public class BinaryMappingTests extends ESSingleNodeTestCase {
|
|||
|
||||
// case 2: a value that looks compressed: this used to fail in 1.x
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
try (StreamOutput compressed = CompressorFactory.defaultCompressor().streamOutput(out)) {
|
||||
try (StreamOutput compressed = CompressorFactory.COMPRESSOR.streamOutput(out)) {
|
||||
new BytesArray(binaryValue1).writeTo(compressed);
|
||||
}
|
||||
final byte[] binaryValue2 = out.bytes().toBytes();
|
||||
|
|
|
@ -138,7 +138,7 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
|
|||
private BytesReference write(T obj) throws IOException {
|
||||
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
|
||||
if (compress) {
|
||||
try (StreamOutput compressedStreamOutput = CompressorFactory.defaultCompressor().streamOutput(bytesStreamOutput)) {
|
||||
try (StreamOutput compressedStreamOutput = CompressorFactory.COMPRESSOR.streamOutput(bytesStreamOutput)) {
|
||||
write(obj, compressedStreamOutput);
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue