HBASE-1887 Update hbase trunk to latests on hadoop 0.21 branch so we can all test sync/append; add fix for non-pathological case

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@823790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-10-10 04:58:37 +00:00
parent 7ffccd0e46
commit 9a08fcfdf1
2 changed files with 55 additions and 43 deletions

View File

@ -370,16 +370,20 @@ public class HLog implements HConstants, Syncable {
* this.end = in.getPos() + length;
*/
private static class WALReader extends SequenceFile.Reader {
private long length;
WALReader(final FileSystem fs, final Path p, final Configuration c)
throws IOException {
super(fs, p, c);
}
@Override
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length)
throws IOException {
return new WALReaderFSDataInputStream(super.openFile(fs, file, bufferSize, length));
return new WALReaderFSDataInputStream(super.openFile(fs, file, bufferSize,
length), length);
}
/**
@ -387,10 +391,12 @@ public class HLog implements HConstants, Syncable {
*/
static class WALReaderFSDataInputStream extends FSDataInputStream {
private boolean firstGetPosInvocation = true;
private long length;
WALReaderFSDataInputStream(final FSDataInputStream is)
WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
throws IOException {
super(is);
this.length = l;
}
@Override
@ -401,7 +407,12 @@ public class HLog implements HConstants, Syncable {
// SequenceFile.Reader constructor comes out with the correct length
// on the file:
// this.end = in.getPos() + length;
return this.in.available();
//
long available = this.in.available();
// Length gets added up in the SF.Reader constructor so subtract the
// difference. If available < this.length, then return this.length.
// I ain't sure what else to do.
return available >= this.length? available - this.length: this.length;
}
return super.getPos();
}
@ -988,7 +999,8 @@ public class HLog implements HConstants, Syncable {
SequenceFile.Reader in = null;
int count = 0;
try {
in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
long len = fs.getFileStatus(logfiles[i].getPath()).getLen();
in = HLog.getReader(fs, logfiles[i].getPath(), conf);
try {
HLogKey key = newKey(conf);
KeyValue val = new KeyValue();

View File

@ -64,6 +64,45 @@ public class TestHLog extends HBaseTestCase implements HConstants {
super.tearDown();
}
/**
* Just write multiple logs then split. Before fix for HADOOP-2283, this
* would fail.
* @throws IOException
*/
public void testSplit() throws IOException {
final byte [] tableName = Bytes.toBytes(getName());
final byte [] rowName = tableName;
HLog log = new HLog(this.fs, this.dir, this.conf, null);
final int howmany = 3;
// Add edits for three regions.
try {
for (int ii = 0; ii < howmany; ii++) {
for (int i = 0; i < howmany; i++) {
for (int j = 0; j < howmany; j++) {
List<KeyValue> edit = new ArrayList<KeyValue>();
byte [] family = Bytes.toBytes("column");
byte [] qualifier = Bytes.toBytes(Integer.toString(j));
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
System.out.println("Region " + i + ": " + edit);
log.append(Bytes.toBytes("" + i), tableName, edit,
false, System.currentTimeMillis());
}
}
log.rollWriter();
}
List<Path> splits =
HLog.splitLog(this.testDir, this.dir, this.fs, this.conf);
verifySplits(splits, howmany);
log = null;
} finally {
if (log != null) {
log.closeAndDelete();
}
}
}
/**
* Test new HDFS-265 sync.
* @throws Exception
@ -123,45 +162,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
assertEquals(total * 2, count);
}
/**
* Just write multiple logs then split. Before fix for HADOOP-2283, this
* would fail.
* @throws IOException
*/
public void testSplit() throws IOException {
final byte [] tableName = Bytes.toBytes(getName());
final byte [] rowName = tableName;
HLog log = new HLog(this.fs, this.dir, this.conf, null);
final int howmany = 3;
// Add edits for three regions.
try {
for (int ii = 0; ii < howmany; ii++) {
for (int i = 0; i < howmany; i++) {
for (int j = 0; j < howmany; j++) {
List<KeyValue> edit = new ArrayList<KeyValue>();
byte [] family = Bytes.toBytes("column");
byte [] qualifier = Bytes.toBytes(Integer.toString(j));
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
System.out.println("Region " + i + ": " + edit);
log.append(Bytes.toBytes("" + i), tableName, edit,
false, System.currentTimeMillis());
}
}
log.rollWriter();
}
List<Path> splits =
HLog.splitLog(this.testDir, this.dir, this.fs, this.conf);
verifySplits(splits, howmany);
log = null;
} finally {
if (log != null) {
log.closeAndDelete();
}
}
}
private void verifySplits(List<Path> splits, final int howmany)
throws IOException {
assertEquals(howmany, splits.size());