on multiple data locations, write the state files of local gateway on all locations

This commit is contained in:
Shay Banon 2011-10-23 22:57:37 +02:00
parent b2b608f9dc
commit a360cc4c4a
7 changed files with 133 additions and 684 deletions

View File

@ -1,198 +0,0 @@
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.InputStream;
public class LZFInputStream extends InputStream {
private final BufferRecycler _recycler;
/**
* stream to be decompressed
*/
protected final InputStream inputStream;
/**
* Flag that indicates if we have already called 'inputStream.close()'
* (to avoid calling it multiple times)
*/
protected boolean inputStreamClosed;
/**
* Flag that indicates whether we force full reads (reading of as many
* bytes as requested), or 'optimal' reads (up to as many as available,
* but at least one). Default is false, meaning that 'optimal' read
* is used.
*/
protected boolean cfgFullReads = false;
/* the current buffer of compressed bytes (from which to decode) */
private byte[] _inputBuffer;
/* the buffer of uncompressed bytes from which content is read */
private byte[] _decodedBytes;
/* The current position (next char to output) in the uncompressed bytes buffer. */
private int bufferPosition = 0;
/* Length of the current uncompressed bytes buffer */
private int bufferLength = 0;
/*
///////////////////////////////////////////////////////////////////////
// Construction
///////////////////////////////////////////////////////////////////////
*/
public LZFInputStream(final InputStream inputStream) throws IOException {
this(inputStream, false);
}
/**
* @param inputStream Underlying input stream to use
* @param fullReads Whether {@link #read(byte[])} should try to read exactly
* as many bytes as requested (true); or just however many happen to be
* available (false)
*/
public LZFInputStream(final InputStream in, boolean fullReads) throws IOException {
super();
_recycler = BufferRecycler.instance();
inputStream = in;
inputStreamClosed = false;
cfgFullReads = fullReads;
_inputBuffer = _recycler.allocInputBuffer(LZFChunk.MAX_CHUNK_LEN);
_decodedBytes = _recycler.allocDecodeBuffer(LZFChunk.MAX_CHUNK_LEN);
}
/*
///////////////////////////////////////////////////////////////////////
// InputStream impl
///////////////////////////////////////////////////////////////////////
*/
/**
* Method is overridden to report number of bytes that can now be read
* from decoded data buffer, without reading bytes from the underlying
* stream.
* Never throws an exception; returns number of bytes available without
* further reads from underlying source; -1 if stream has been closed, or
* 0 if an actual read (and possible blocking) is needed to find out.
*/
@Override
public int available() {
// if closed, return -1;
if (inputStreamClosed) {
return -1;
}
int left = (bufferLength - bufferPosition);
return (left <= 0) ? 0 : left;
}
@Override
public int read() throws IOException {
if (!readyBuffer()) {
return -1;
}
return _decodedBytes[bufferPosition++] & 255;
}
@Override
public int read(final byte[] buffer) throws IOException {
return read(buffer, 0, buffer.length);
}
@Override
public int read(final byte[] buffer, int offset, int length) throws IOException {
if (length < 1) {
return 0;
}
if (!readyBuffer()) {
return -1;
}
// First let's read however much data we happen to have...
int chunkLength = Math.min(bufferLength - bufferPosition, length);
System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
bufferPosition += chunkLength;
if (chunkLength == length || !cfgFullReads) {
return chunkLength;
}
// Need more data, then
int totalRead = chunkLength;
do {
offset += chunkLength;
if (!readyBuffer()) {
break;
}
chunkLength = Math.min(bufferLength - bufferPosition, (length - totalRead));
System.arraycopy(_decodedBytes, bufferPosition, buffer, offset, chunkLength);
bufferPosition += chunkLength;
totalRead += chunkLength;
} while (totalRead < length);
return totalRead;
}
@Override
public void close() throws IOException {
bufferPosition = bufferLength = 0;
byte[] buf = _inputBuffer;
if (buf != null) {
_inputBuffer = null;
_recycler.releaseInputBuffer(buf);
}
buf = _decodedBytes;
if (buf != null) {
_decodedBytes = null;
_recycler.releaseDecodeBuffer(buf);
}
if (!inputStreamClosed) {
inputStreamClosed = true;
inputStream.close();
}
}
/*
///////////////////////////////////////////////////////////////////////
// Additional public accessors
///////////////////////////////////////////////////////////////////////
*/
/**
* Method that can be used to find underlying {@link InputStream} that
* we read from to get LZF encoded data to decode.
* Will never return null; although underlying stream may be closed
* (if this stream has been closed).
*
* @since 0.8
*/
public InputStream getUnderlyingInputStream() {
return inputStream;
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
/**
* Fill the uncompressed bytes buffer by reading the underlying inputStream.
*
* @throws IOException
*/
protected boolean readyBuffer() throws IOException {
if (bufferPosition < bufferLength) {
return true;
}
if (inputStreamClosed) {
return false;
}
bufferLength = LZFDecoder.decompressChunk(inputStream, _inputBuffer, _decodedBytes);
if (bufferLength < 0) {
return false;
}
bufferPosition = 0;
return (bufferPosition < bufferLength);
}
}

View File

@ -1,148 +0,0 @@
package org.elasticsearch.common.compress.lzf;
import java.io.IOException;
import java.io.OutputStream;
/**
* @author jon hartlaub
* @author tatu
*/
public class LZFOutputStream extends OutputStream {
private static int OUTPUT_BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN;
private final ChunkEncoder _encoder;
private final BufferRecycler _recycler;
protected final OutputStream _outputStream;
protected byte[] _outputBuffer;
protected int _position = 0;
/**
* Flag that indicates if we have already called '_outputStream.close()'
* (to avoid calling it multiple times)
*/
protected boolean _outputStreamClosed;
/*
///////////////////////////////////////////////////////////////////////
// Construction
///////////////////////////////////////////////////////////////////////
*/
public LZFOutputStream(final OutputStream outputStream) {
_recycler = BufferRecycler.instance();
_encoder = new ChunkEncoder(OUTPUT_BUFFER_SIZE, _recycler);
_outputStream = outputStream;
_outputBuffer = _recycler.allocOutputBuffer(OUTPUT_BUFFER_SIZE);
_outputStreamClosed = false;
}
/*
///////////////////////////////////////////////////////////////////////
// OutputStream impl
///////////////////////////////////////////////////////////////////////
*/
@Override
public void write(final int singleByte) throws IOException {
if (_position >= _outputBuffer.length) {
writeCompressedBlock();
}
_outputBuffer[_position++] = (byte) singleByte;
}
@Override
public void write(final byte[] buffer, int offset, int length) throws IOException {
final int BUFFER_LEN = _outputBuffer.length;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - _position;
if (free >= length) {
System.arraycopy(buffer, offset, _outputBuffer, _position, length);
_position += length;
return;
}
// otherwise, copy whatever we can, flush
System.arraycopy(buffer, offset, _outputBuffer, _position, free);
offset += free;
length -= free;
_position += free;
writeCompressedBlock();
// then write intermediate full block, if any, without copying:
while (length >= BUFFER_LEN) {
_encoder.encodeAndWriteChunk(buffer, offset, BUFFER_LEN, _outputStream);
offset += BUFFER_LEN;
length -= BUFFER_LEN;
}
// and finally, copy leftovers in buffer, if any
if (length > 0) {
System.arraycopy(buffer, offset, _outputBuffer, 0, length);
}
_position = length;
}
@Override
public void flush() throws IOException {
if (_position > 0) {
writeCompressedBlock();
}
_outputStream.flush();
}
@Override
public void close() throws IOException {
flush();
_encoder.close();
byte[] buf = _outputBuffer;
if (buf != null) {
_outputBuffer = null;
_recycler.releaseOutputBuffer(buf);
}
if (!_outputStreamClosed) {
_outputStreamClosed = true;
_outputStream.close();
}
}
/*
///////////////////////////////////////////////////////////////////////
// Additional public accessors
///////////////////////////////////////////////////////////////////////
*/
/**
* Method that can be used to find underlying {@link OutputStream} that
* we write encoded LZF encoded data into, after compressing it.
* Will never return null; although underlying stream may be closed
* (if this stream has been closed).
*
* @since 0.8
*/
public OutputStream getUnderlyingOutputStream() {
return _outputStream;
}
/*
///////////////////////////////////////////////////////////////////////
// Internal methods
///////////////////////////////////////////////////////////////////////
*/
/**
* Compress and write the current block to the OutputStream
*/
protected void writeCompressedBlock() throws IOException {
int left = _position;
_position = 0;
int offset = 0;
do {
int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
_encoder.encodeAndWriteChunk(_outputBuffer, 0, chunkLen, _outputStream);
offset += chunkLen;
left -= chunkLen;
} while (left > 0);
}
}

View File

@ -128,6 +128,7 @@ public class CachedStreamOutput {
Queue<Entry> ref = cache.get();
if (ref == null) {
ref = new LinkedTransferQueue<Entry>();
counter.set(0);
cache.set(ref);
}
if (counter.incrementAndGet() > COUNT_LIMIT) {

View File

@ -31,11 +31,11 @@ import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFOutputStream;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -52,7 +52,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
/**
@ -152,20 +151,24 @@ public abstract class BlobStoreGateway extends SharedStorageGateway {
@Override public void write(MetaData metaData) throws GatewayException {
final String newMetaData = "metadata-" + (currentIndex + 1);
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
FastByteArrayOutputStream out = new FastByteArrayOutputStream();
OutputStream os = out;
StreamOutput streamOutput;
if (compress) {
os = new LZFOutputStream(os);
streamOutput = cachedEntry.cachedLZFBytes();
} else {
streamOutput = cachedEntry.cachedBytes();
}
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, os);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput);
builder.startObject();
MetaData.Builder.toXContent(metaData, builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.close();
metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(out.underlyingBytes(), 0, out.size()), out.size());
metaDataBlobContainer.writeBlob(newMetaData, new ByteArrayInputStream(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size()), cachedEntry.bytes().size());
} catch (IOException e) {
throw new GatewayException("Failed to write metadata [" + newMetaData + "]", e);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
currentIndex++;

View File

@ -34,14 +34,16 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFOutputStream;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.Closeables;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.thread.LoggingRunnable;
import org.elasticsearch.common.xcontent.ToXContent;
@ -60,7 +62,6 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@ -427,54 +428,74 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
final long version = event.state().metaData().version();
builder.version(version);
builder.metaData(event.state().metaData());
LocalGatewayMetaState stateToWrite = builder.build();
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
StreamOutput streamOutput;
try {
File stateLocation = new File(nodeEnv.nodeDataLocations()[0], "_state");
if (!stateLocation.exists()) {
FileSystemUtils.mkdirs(stateLocation);
try {
if (compress) {
streamOutput = cachedEntry.cachedLZFBytes();
} else {
streamOutput = cachedEntry.cachedBytes();
}
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
} catch (Exception e) {
logger.warn("failed to serialize local gateway state", e);
return;
}
File stateFile = new File(stateLocation, "metadata-" + version);
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
LocalGatewayMetaState stateToWrite = builder.build();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayMetaState.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();
FileSystemUtils.syncFile(stateFile);
currentMetaState = stateToWrite;
// delete all the other files
boolean serializedAtLeastOnce = false;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
stateLocation = new File(dataLocation, "_state");
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
FileSystemUtils.mkdirs(stateLocation);
}
File[] files = stateLocation.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
File stateFile = new File(stateLocation, "metadata-" + version);
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
fos.getChannel().force(true);
serializedAtLeastOnce = true;
} catch (Exception e) {
logger.warn("failed to write local gateway state to {}", e, stateFile);
} finally {
Closeables.closeQuietly(fos);
}
}
if (serializedAtLeastOnce) {
currentMetaState = stateToWrite;
metaDataPersistedAtLeastOnce = true;
// delete all the other files
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
});
if (files != null) {
for (File file : files) {
file.delete();
File[] files = stateLocation.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("metadata-") && !name.equals("metadata-" + version);
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}
}
}
} catch (IOException e) {
logger.warn("failed to write updated state", e);
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
metaDataPersistedAtLeastOnce = true;
}
}
@ -488,52 +509,72 @@ public class LocalGateway extends AbstractLifecycleComponent<Gateway> implements
}
@Override public void run() {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
try {
File stateLocation = new File(nodeEnv.nodeDataLocations()[0], "_state");
if (!stateLocation.exists()) {
FileSystemUtils.mkdirs(stateLocation);
}
File stateFile = new File(stateLocation, "shards-" + event.state().version());
OutputStream fos = new FileOutputStream(stateFile);
if (compress) {
fos = new LZFOutputStream(fos);
}
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
fos.close();
FileSystemUtils.syncFile(stateFile);
currentStartedShards = stateToWrite;
} catch (IOException e) {
logger.warn("failed to write updated state", e);
return;
}
// delete all the other files
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
File[] files = stateLocation.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
StreamOutput streamOutput;
try {
if (compress) {
streamOutput = cachedEntry.cachedLZFBytes();
} else {
streamOutput = cachedEntry.cachedBytes();
}
});
if (files != null) {
for (File file : files) {
file.delete();
XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, streamOutput);
if (prettyPrint) {
xContentBuilder.prettyPrint();
}
xContentBuilder.startObject();
LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS);
xContentBuilder.endObject();
xContentBuilder.close();
} catch (Exception e) {
logger.warn("failed to serialize local gateway shard states", e);
return;
}
boolean serializedAtLeastOnce = false;
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
FileSystemUtils.mkdirs(stateLocation);
}
File stateFile = new File(stateLocation, "shards-" + event.state().version());
FileOutputStream fos = null;
try {
fos = new FileOutputStream(stateFile);
fos.write(cachedEntry.bytes().underlyingBytes(), 0, cachedEntry.bytes().size());
fos.getChannel().force(true);
serializedAtLeastOnce = true;
} catch (Exception e) {
logger.warn("failed to write local gateway shards state to {}", e, stateFile);
} finally {
Closeables.closeQuietly(fos);
}
}
if (serializedAtLeastOnce) {
currentStartedShards = stateToWrite;
// delete all the other files
for (File dataLocation : nodeEnv.nodeDataLocations()) {
File stateLocation = new File(dataLocation, "_state");
if (!stateLocation.exists()) {
continue;
}
File[] files = stateLocation.listFiles(new FilenameFilter() {
@Override public boolean accept(File dir, String name) {
return name.startsWith("shards-") && !name.equals("shards-" + event.state().version());
}
});
if (files != null) {
for (File file : files) {
file.delete();
}
}
}
}
} finally {
CachedStreamOutput.pushEntry(cachedEntry);
}
}
}

View File

@ -1,154 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.*;
import java.security.SecureRandom;
import java.util.Random;
/**
* @author kimchy (shay.banon)
*/
public class LZFInputStreamTests {
private static int BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN * 64;
private byte[] nonEncodableBytesToWrite = new byte[BUFFER_SIZE];
private byte[] bytesToWrite = new byte[BUFFER_SIZE];
private ByteArrayOutputStream nonCompressed;
private ByteArrayOutputStream compressed;
@BeforeClass
public void setUp() throws Exception {
SecureRandom.getInstance("SHA1PRNG").nextBytes(nonEncodableBytesToWrite);
String phrase = "all work and no play make Jack a dull boy";
byte[] bytes = phrase.getBytes();
int cursor = 0;
while (cursor <= bytesToWrite.length) {
System.arraycopy(bytes, 0, bytesToWrite, cursor, (bytes.length + cursor < bytesToWrite.length) ? bytes.length : bytesToWrite.length - cursor);
cursor += bytes.length;
}
nonCompressed = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(nonCompressed);
os.write(nonEncodableBytesToWrite);
os.close();
compressed = new ByteArrayOutputStream();
os = new LZFOutputStream(compressed);
os.write(bytesToWrite);
os.close();
}
@Test
public void testDecompressNonEncodableReadByte() throws IOException {
doDecompressReadBlock(nonCompressed.toByteArray(), nonEncodableBytesToWrite);
}
@Test
public void testDecompressNonEncodableReadBlock() throws IOException {
doDecompressReadBlock(nonCompressed.toByteArray(), nonEncodableBytesToWrite);
}
@Test
public void testDecompressEncodableReadByte() throws IOException {
doDecompressReadBlock(compressed.toByteArray(), bytesToWrite);
}
@Test
public void testDecompressEncodableReadBlock() throws IOException {
doDecompressReadBlock(compressed.toByteArray(), bytesToWrite);
}
public void doDecompressNonEncodableReadByte(byte[] bytes, byte[] reference) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
int outputBytes = 0;
InputStream is = new LZFInputStream(bis);
int val;
while ((val = is.read()) != -1) {
byte testVal = (byte) (val & 255);
Assert.assertTrue(testVal == reference[outputBytes]);
outputBytes++;
}
Assert.assertTrue(outputBytes == reference.length);
}
@Test void testIncremental() throws IOException {
// first need to compress something...
String[] words = new String[]{"what", "ever", "some", "other", "words", "too"};
StringBuilder sb = new StringBuilder(258000);
Random rnd = new Random(123);
while (sb.length() < 256000) {
int i = (rnd.nextInt() & 31);
if (i < words.length) {
sb.append(words[i]);
} else {
sb.append(i);
}
}
byte[] uncomp = sb.toString().getBytes("UTF-8");
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
LZFOutputStream lzOut = new LZFOutputStream(bytes);
lzOut.write(uncomp);
lzOut.close();
byte[] comp = bytes.toByteArray();
// read back, in chunks
bytes = new ByteArrayOutputStream(uncomp.length);
byte[] buffer = new byte[64];
LZFInputStream lzIn = new LZFInputStream(new ByteArrayInputStream(comp));
while (true) {
int len = 1 + ((rnd.nextInt() & 0x7FFFFFFF) % buffer.length);
int offset = buffer.length - len;
int count = lzIn.read(buffer, offset, len);
if (count < 0) {
break;
}
if (count > len) {
Assert.fail("Requested " + len + " bytes (offset " + offset + ", array length " + buffer.length + "), got " + count);
}
bytes.write(buffer, offset, count);
}
byte[] result = bytes.toByteArray();
Assert.assertEquals(result.length, uncomp.length);
Assert.assertEquals(result, uncomp);
}
private void doDecompressReadBlock(byte[] bytes, byte[] reference) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
int outputBytes = 0;
InputStream is = new LZFInputStream(bis);
int val;
byte[] buffer = new byte[65536 + 23];
while ((val = is.read(buffer)) != -1) {
for (int i = 0; i < val; i++) {
byte testVal = buffer[i];
Assert.assertTrue(testVal == reference[outputBytes]);
outputBytes++;
}
}
Assert.assertTrue(outputBytes == reference.length);
}
}

View File

@ -1,96 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.security.SecureRandom;
/**
* @author kimchy (shay.banon)
*/
public class LZFOutputStreamTests {
private static int BUFFER_SIZE = LZFChunk.MAX_CHUNK_LEN * 64;
private byte[] nonEncodableBytesToWrite = new byte[BUFFER_SIZE];
private byte[] bytesToWrite = new byte[BUFFER_SIZE];
@BeforeClass
public void setUp() throws Exception {
SecureRandom.getInstance("SHA1PRNG").nextBytes(nonEncodableBytesToWrite);
String phrase = "all work and no play make Jack a dull boy";
byte[] bytes = phrase.getBytes();
int cursor = 0;
while (cursor <= bytesToWrite.length) {
System.arraycopy(bytes, 0, bytesToWrite, cursor, (bytes.length + cursor < bytesToWrite.length) ? bytes.length : bytesToWrite.length - cursor);
cursor += bytes.length;
}
}
@Test
public void testUnencodable() throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
os.write(nonEncodableBytesToWrite);
os.close();
Assert.assertTrue(bos.toByteArray().length > nonEncodableBytesToWrite.length);
}
@Test
public void testStreaming() throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
os.write(bytesToWrite);
os.close();
Assert.assertTrue(bos.toByteArray().length > 10);
Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
}
@Test
public void testSingleByte() throws Exception {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
for (int idx = 0; idx < BUFFER_SIZE; idx++) {
os.write(bytesToWrite[idx]);
if (idx % 1023 == 0) {
os.flush();
}
}
os.close();
Assert.assertTrue(bos.toByteArray().length > 10);
Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
}
@Test
public void testPartialBuffer() throws Exception {
int offset = 255;
int len = 1 << 17;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
OutputStream os = new LZFOutputStream(bos);
os.write(bytesToWrite, offset, len);
os.close();
Assert.assertTrue(bos.toByteArray().length > 10);
Assert.assertTrue(bos.toByteArray().length < bytesToWrite.length * .5);
}
}