HBASE-8615 HLog Compression may fail due to Hadoop fs input stream returning partial bytes

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1512133 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-08-09 04:05:24 +00:00
parent db1e061c7d
commit 1236068fd0
2 changed files with 62 additions and 12 deletions

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.IOUtils;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
@ -219,7 +220,7 @@ public class WALCellCodec implements Codec {
pos += elemLen;
// the rest
in.read(backingArray, pos, length - pos);
IOUtils.readFully(in, backingArray, pos, length - pos);
return new KeyValue(backingArray);
}
@ -229,7 +230,7 @@ public class WALCellCodec implements Codec {
// status byte indicating that data to be read is not in dictionary.
// if this isn't in the dictionary, we need to add to the dictionary.
int length = StreamUtils.readRawVarint32(in);
in.read(to, offset, length);
IOUtils.readFully(in, to, offset, length);
dict.addEntry(to, offset, length);
return length;
} else {

View File

@ -18,9 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -29,7 +26,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@ -44,18 +41,23 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import static org.junit.Assert.*;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@Category(MediumTests.class)
@Category(LargeTests.class)
@RunWith(Parameterized.class)
public class TestReplicationHLogReaderManager {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static Path hbaseDir;
private static FileSystem fs;
@ -70,13 +72,44 @@ public class TestReplicationHLogReaderManager {
private HLog log;
private ReplicationHLogReaderManager logManager;
private PathWatcher pathWatcher;
private int nbRows;
private int walEditKVs;
@Parameters
public static Collection<Object[]> parameters() {
// Try out different combinations of row count and KeyValue count
int[] NB_ROWS = { 1500, 60000 };
int[] NB_KVS = { 1, 100 };
// whether compression is used
Boolean[] BOOL_VALS = { false, true };
List<Object[]> parameters = new ArrayList<Object[]>();
for (int nbRows : NB_ROWS) {
for (int walEditKVs : NB_KVS) {
for (boolean b : BOOL_VALS) {
Object[] arr = new Object[3];
arr[0] = nbRows;
arr[1] = walEditKVs;
arr[2] = b;
parameters.add(arr);
}
}
}
return parameters;
}
public TestReplicationHLogReaderManager(int nbRows, int walEditKVs, boolean enableCompression) {
this.nbRows = nbRows;
this.walEditKVs = walEditKVs;
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
enableCompression);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
conf = TEST_UTIL.getConfiguration();
hbaseDir = TEST_UTIL.createRootDir();
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
@ -156,16 +189,32 @@ public class TestReplicationHLogReaderManager {
fail();
} catch (EOFException ex) {}
for (int i = 0; i < nbRows; i++) { appendToLogPlus(walEditKVs); }
log.rollWriter();
logManager.openReader(path);
logManager.seek();
for (int i = 0; i < nbRows; i++) {
HLog.Entry e = logManager.readNextAndSetPosition(entriesArray, 0);
if (e == null) {
fail("Should have enough entries");
}
}
}
private void appendToLog() throws IOException {
log.append(info, tableName, getWALEdit(), System.currentTimeMillis(), htd);
appendToLogPlus(1);
}
private WALEdit getWALEdit() {
private void appendToLogPlus(int count) throws IOException {
log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd);
}
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
for (int i = 0; i < count; i++) {
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
System.currentTimeMillis(), qualifier));
}
return edit;
}