HBASE-2345 Add Test in 0.20 to Check for proper HDFS-200 append/sync support
(Nicolas Spiegelberg via JD) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@957679 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d7c976d35d
commit
9dd7a6bccc
|
@ -741,6 +741,8 @@ Release 0.21.0 - Unreleased
|
|||
HBASE-2779 Build a -src tgz to sit beside our -bin tgz when you call
|
||||
maven assembly:assembly
|
||||
HBASE-2783 Quick edit of 'Getting Started' for development release 0.89.x
|
||||
HBASE-2345 Add Test in 0.20 to Check for proper HDFS-200 append/sync support
|
||||
(Nicolas Spiegelberg via JD)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -20,27 +20,50 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestCase;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.log4j.Level;
|
||||
|
||||
/** JUnit test case for HLog */
|
||||
public class TestHLog extends HBaseTestCase {
|
||||
private static final Log LOG = LogFactory.getLog(TestHLog.class);
|
||||
{
|
||||
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
||||
((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
private Path dir;
|
||||
private Path oldLogDir;
|
||||
private MiniDFSCluster cluster;
|
||||
|
@ -50,6 +73,16 @@ public class TestHLog extends HBaseTestCase {
|
|||
// Make block sizes small.
|
||||
this.conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||
this.conf.setInt("hbase.regionserver.flushlogentries", 1);
|
||||
// needed for testAppendClose()
|
||||
conf.setBoolean("dfs.support.append", true);
|
||||
// quicker heartbeat interval for faster DN death notification
|
||||
conf.setInt("heartbeat.recheck.interval", 5000);
|
||||
conf.setInt("dfs.heartbeat.interval", 1);
|
||||
conf.setInt("dfs.socket.timeout", 5000);
|
||||
// faster failover with cluster.shutdown();fs.close() idiom
|
||||
conf.setInt("ipc.client.connect.max.retries", 1);
|
||||
conf.setInt("dfs.client.block.recovery.retries", 1);
|
||||
|
||||
cluster = new MiniDFSCluster(conf, 3, true, (String[])null);
|
||||
// Set the hbase.rootdir to be the home directory in mini dfs.
|
||||
this.conf.set(HConstants.HBASE_DIR,
|
||||
|
@ -106,12 +139,15 @@ public class TestHLog extends HBaseTestCase {
|
|||
System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
log.hflush();
|
||||
log.rollWriter();
|
||||
}
|
||||
Configuration newConf = new Configuration(this.conf);
|
||||
// We disable appends here because the last file is still being
|
||||
// considered as under construction by the same DFSClient.
|
||||
newConf.setBoolean("dfs.support.append", false);
|
||||
Path splitsdir = new Path(this.dir, "splits");
|
||||
List<Path> splits =
|
||||
HLog.splitLog(splitsdir, logdir, this.oldLogDir, this.fs, this.conf);
|
||||
HLog.splitLog(splitsdir, logdir, this.oldLogDir, this.fs, newConf);
|
||||
verifySplits(splits, howmany);
|
||||
log = null;
|
||||
} finally {
|
||||
|
@ -260,6 +296,106 @@ public class TestHLog extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// For this test to pass, requires:
|
||||
// 1. HDFS-200 (append support)
|
||||
// 2. HDFS-988 (SafeMode should freeze file operations
|
||||
// [FSNamesystem.nextGenerationStampForBlock])
|
||||
// 3. HDFS-142 (on restart, maintain pendingCreates)
|
||||
public void testAppendClose() throws Exception {
|
||||
this.conf.setBoolean("dfs.support.append", true);
|
||||
byte [] tableName = Bytes.toBytes(getName());
|
||||
HRegionInfo regioninfo = new HRegionInfo(new HTableDescriptor(tableName),
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false);
|
||||
Path subdir = new Path(this.dir, "hlogdir");
|
||||
Path archdir = new Path(this.dir, "hlogdir_archive");
|
||||
HLog wal = new HLog(this.fs, subdir, archdir, this.conf, null);
|
||||
final int total = 20;
|
||||
|
||||
for (int i = 0; i < total; i++) {
|
||||
WALEdit kvs = new WALEdit();
|
||||
kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName));
|
||||
wal.append(regioninfo, tableName, kvs, System.currentTimeMillis());
|
||||
}
|
||||
// Now call sync to send the data to HDFS datanodes
|
||||
wal.sync(true);
|
||||
final Path walPath = wal.computeFilename();
|
||||
|
||||
// Stop the cluster. (ensure restart since we're sharing MiniDFSCluster)
|
||||
try {
|
||||
this.cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
this.cluster.shutdown();
|
||||
try {
|
||||
// wal.writer.close() will throw an exception,
|
||||
// but still call this since it closes the LogSyncer thread first
|
||||
wal.close();
|
||||
} catch (IOException e) {
|
||||
LOG.info(e);
|
||||
}
|
||||
this.fs.close(); // closing FS last so DFSOutputStream can't call close
|
||||
LOG.info("STOPPED first instance of the cluster");
|
||||
} finally {
|
||||
// Restart the cluster
|
||||
this.cluster = new MiniDFSCluster(conf, 2, false, null);
|
||||
this.cluster.waitActive();
|
||||
this.fs = cluster.getFileSystem();
|
||||
LOG.info("START second instance.");
|
||||
}
|
||||
|
||||
// set the lease period to be 1 second so that the
|
||||
// namenode triggers lease recovery upon append request
|
||||
Method setLeasePeriod = this.cluster.getClass()
|
||||
.getDeclaredMethod("setLeasePeriod", new Class[]{Long.TYPE, Long.TYPE});
|
||||
setLeasePeriod.setAccessible(true);
|
||||
setLeasePeriod.invoke(cluster,
|
||||
new Object[]{new Long(1000), new Long(1000)});
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info(e);
|
||||
}
|
||||
|
||||
// Now try recovering the log, like the HMaster would do
|
||||
final FileSystem recoveredFs = this.fs;
|
||||
final Configuration rlConf = this.conf;
|
||||
|
||||
class RecoverLogThread extends Thread {
|
||||
public Exception exception = null;
|
||||
public void run() {
|
||||
try {
|
||||
FSUtils.recoverFileLease(recoveredFs, walPath, rlConf);
|
||||
} catch (IOException e) {
|
||||
exception = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RecoverLogThread t = new RecoverLogThread();
|
||||
t.start();
|
||||
// Timeout after 60 sec. Without correct patches, would be an infinite loop
|
||||
t.join(60 * 1000);
|
||||
if(t.isAlive()) {
|
||||
t.interrupt();
|
||||
throw new Exception("Timed out waiting for HLog.recoverLog()");
|
||||
}
|
||||
|
||||
if (t.exception != null)
|
||||
throw t.exception;
|
||||
|
||||
// Make sure you can read all the content
|
||||
SequenceFile.Reader reader
|
||||
= new SequenceFile.Reader(this.fs, walPath, this.conf);
|
||||
int count = 0;
|
||||
HLogKey key = HLog.newKey(this.conf);
|
||||
WALEdit val = new WALEdit();
|
||||
while (reader.next(key, val)) {
|
||||
count++;
|
||||
assertTrue("Should be one KeyValue per WALEdit",
|
||||
val.getKeyValues().size() == 1);
|
||||
}
|
||||
assertEquals(total, count);
|
||||
reader.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that we can write out an edit, close, and then read it back in again.
|
||||
|
|
Loading…
Reference in New Issue