HBASE-2868 Do some small cleanups in org.apache.hadoop.hbase.regionserver.wal
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@984433 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b5d7dbb1c
commit
92ffa8580c
|
@ -833,6 +833,8 @@ Release 0.21.0 - Unreleased
|
||||||
(Chongxin Li via Stack)
|
(Chongxin Li via Stack)
|
||||||
HBASE-2844 Capping the number of regions (Pranav Khaitan via Stack)
|
HBASE-2844 Capping the number of regions (Pranav Khaitan via Stack)
|
||||||
HBASE-2870 Add Backup CLI Option to HMaster (Nicolas Spiegelberg via Stack)
|
HBASE-2870 Add Backup CLI Option to HMaster (Nicolas Spiegelberg via Stack)
|
||||||
|
HBASE-2868 Do some small cleanups in org.apache.hadoop.hbase.regionserver.wal
|
||||||
|
(Alex Newman via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -1001,10 +1002,13 @@ public class HLog implements Syncable {
|
||||||
* If the pipeline isn't started yet or is empty, you will get the default
|
* If the pipeline isn't started yet or is empty, you will get the default
|
||||||
* replication factor. Therefore, if this function returns 0, it means you
|
* replication factor. Therefore, if this function returns 0, it means you
|
||||||
* are not properly running with the HDFS-826 patch.
|
* are not properly running with the HDFS-826 patch.
|
||||||
|
* @throws InvocationTargetException
|
||||||
|
* @throws IllegalAccessException
|
||||||
|
* @throws IllegalArgumentException
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
int getLogReplication() throws Exception {
|
int getLogReplication() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
|
||||||
if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
|
if(this.getNumCurrentReplicas != null && this.hdfs_out != null) {
|
||||||
Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
|
Object repl = this.getNumCurrentReplicas.invoke(this.hdfs_out, NO_ARGS);
|
||||||
if (repl instanceof Integer) {
|
if (repl instanceof Integer) {
|
||||||
|
|
|
@ -19,7 +19,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -27,10 +29,13 @@ import java.util.List;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
@ -39,22 +44,33 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test log deletion as logs are rolled.
|
* Test log deletion as logs are rolled.
|
||||||
*/
|
*/
|
||||||
public class TestLogRolling extends HBaseClusterTestCase {
|
public class TestLogRolling {
|
||||||
private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
|
private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
|
||||||
private HRegionServer server;
|
private HRegionServer server;
|
||||||
private HLog log;
|
private HLog log;
|
||||||
private String tableName;
|
private String tableName;
|
||||||
private byte[] value;
|
private byte[] value;
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static MiniDFSCluster dfsCluster;
|
||||||
|
private static HBaseAdmin admin;
|
||||||
|
private static MiniHBaseCluster cluster;
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
// verbose logging on classes that are touched in these tests
|
// verbose logging on classes that are touched in these tests
|
||||||
{
|
{
|
||||||
|
@ -71,10 +87,9 @@ public class TestLogRolling extends HBaseClusterTestCase {
|
||||||
* constructor
|
* constructor
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public TestLogRolling() throws Exception {
|
public TestLogRolling() {
|
||||||
// start one regionserver and a minidfs.
|
// start one regionserver and a minidfs.
|
||||||
super();
|
super();
|
||||||
try {
|
|
||||||
this.server = null;
|
this.server = null;
|
||||||
this.log = null;
|
this.log = null;
|
||||||
this.tableName = null;
|
this.tableName = null;
|
||||||
|
@ -86,63 +101,67 @@ public class TestLogRolling extends HBaseClusterTestCase {
|
||||||
v.append(className);
|
v.append(className);
|
||||||
}
|
}
|
||||||
value = Bytes.toBytes(v.toString());
|
value = Bytes.toBytes(v.toString());
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.fatal("error in constructor", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Need to override this setup so we can edit the config before it gets sent
|
// Need to override this setup so we can edit the config before it gets sent
|
||||||
// to the HDFS & HBase cluster startup.
|
// to the HDFS & HBase cluster startup.
|
||||||
@Override
|
@BeforeClass
|
||||||
protected void setUp() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
/**** configuration for testLogRolling ****/
|
/**** configuration for testLogRolling ****/
|
||||||
// Force a region split after every 768KB
|
// Force a region split after every 768KB
|
||||||
conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
|
TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize", 768L * 1024L);
|
||||||
|
|
||||||
// We roll the log after every 32 writes
|
// We roll the log after every 32 writes
|
||||||
conf.setInt("hbase.regionserver.maxlogentries", 32);
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32);
|
||||||
|
|
||||||
// For less frequently updated regions flush after every 2 flushes
|
// For less frequently updated regions flush after every 2 flushes
|
||||||
conf.setInt("hbase.hregion.memstore.optionalflushcount", 2);
|
TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2);
|
||||||
|
|
||||||
// We flush the cache after every 8192 bytes
|
// We flush the cache after every 8192 bytes
|
||||||
conf.setInt("hbase.hregion.memstore.flush.size", 8192);
|
TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.flush.size", 8192);
|
||||||
|
|
||||||
// Increase the amount of time between client retries
|
// Increase the amount of time between client retries
|
||||||
conf.setLong("hbase.client.pause", 15 * 1000);
|
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
|
||||||
|
|
||||||
// Reduce thread wake frequency so that other threads can get
|
// Reduce thread wake frequency so that other threads can get
|
||||||
// a chance to run.
|
// a chance to run.
|
||||||
conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
|
TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
|
||||||
|
|
||||||
/**** configuration for testLogRollOnDatanodeDeath ****/
|
/**** configuration for testLogRollOnDatanodeDeath ****/
|
||||||
// make sure log.hflush() calls syncFs() to open a pipeline
|
// make sure log.hflush() calls syncFs() to open a pipeline
|
||||||
conf.setBoolean("dfs.support.append", true);
|
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
|
||||||
// lower the namenode & datanode heartbeat so the namenode
|
// lower the namenode & datanode heartbeat so the namenode
|
||||||
// quickly detects datanode failures
|
// quickly detects datanode failures
|
||||||
conf.setInt("heartbeat.recheck.interval", 5000);
|
TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000);
|
||||||
conf.setInt("dfs.heartbeat.interval", 1);
|
TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1);
|
||||||
// the namenode might still try to choose the recently-dead datanode
|
// the namenode might still try to choose the recently-dead datanode
|
||||||
// for a pipeline, so try to a new pipeline multiple times
|
// for a pipeline, so try to a new pipeline multiple times
|
||||||
conf.setInt("dfs.client.block.write.retries", 30);
|
TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30);
|
||||||
|
TEST_UTIL.startMiniCluster(2);
|
||||||
|
|
||||||
super.setUp();
|
cluster = TEST_UTIL.getHBaseCluster();
|
||||||
|
dfsCluster = TEST_UTIL.getDFSCluster();
|
||||||
|
fs = TEST_UTIL.getTestFileSystem();
|
||||||
|
admin = TEST_UTIL.getHBaseAdmin();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startAndWriteData() throws Exception {
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws IOException {
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startAndWriteData() throws IOException {
|
||||||
// When the META table can be opened, the region servers are running
|
// When the META table can be opened, the region servers are running
|
||||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
|
||||||
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
|
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
|
||||||
this.log = server.getLog();
|
this.log = server.getLog();
|
||||||
|
|
||||||
// Create the test table and open it
|
// Create the test table and open it
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
|
||||||
admin.createTable(desc);
|
admin.createTable(desc);
|
||||||
HTable table = new HTable(conf, tableName);
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
|
for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls
|
||||||
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
|
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i)));
|
||||||
put.add(HConstants.CATALOG_FAMILY, null, value);
|
put.add(HConstants.CATALOG_FAMILY, null, value);
|
||||||
|
@ -160,12 +179,12 @@ public class TestLogRolling extends HBaseClusterTestCase {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that logs are deleted
|
* Tests that logs are deleted
|
||||||
*
|
* @throws IOException
|
||||||
* @throws Exception
|
* @throws FailedLogCloseException
|
||||||
*/
|
*/
|
||||||
public void testLogRolling() throws Exception {
|
@Test
|
||||||
|
public void testLogRolling() throws FailedLogCloseException, IOException {
|
||||||
this.tableName = getName();
|
this.tableName = getName();
|
||||||
try {
|
|
||||||
startAndWriteData();
|
startAndWriteData();
|
||||||
LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
|
LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
|
||||||
|
|
||||||
|
@ -184,13 +203,13 @@ public class TestLogRolling extends HBaseClusterTestCase {
|
||||||
LOG.info("after flushing all regions and rolling logs there are " +
|
LOG.info("after flushing all regions and rolling logs there are " +
|
||||||
log.getNumLogFiles() + " log files");
|
log.getNumLogFiles() + " log files");
|
||||||
assertTrue(("actual count: " + count), count <= 2);
|
assertTrue(("actual count: " + count), count <= 2);
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.fatal("unexpected exception", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void writeData(HTable table, int rownum) throws Exception {
|
private static String getName() {
|
||||||
|
return "TestLogRolling";
|
||||||
|
}
|
||||||
|
|
||||||
|
void writeData(HTable table, int rownum) throws IOException {
|
||||||
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
|
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
|
||||||
put.add(HConstants.CATALOG_FAMILY, null, value);
|
put.add(HConstants.CATALOG_FAMILY, null, value);
|
||||||
table.put(put);
|
table.put(put);
|
||||||
|
@ -204,78 +223,99 @@ public class TestLogRolling extends HBaseClusterTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that logs are rolled upon detecting datanode death
|
* Give me the HDFS pipeline for this log file
|
||||||
* Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
|
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public void testLogRollOnDatanodeDeath() throws Exception {
|
@SuppressWarnings("null")
|
||||||
assertTrue("This test requires HLog file replication.",
|
DatanodeInfo[] getPipeline(HLog log) throws IllegalArgumentException,
|
||||||
fs.getDefaultReplication() > 1);
|
IllegalAccessException, InvocationTargetException {
|
||||||
|
|
||||||
// When the META table can be opened, the region servers are running
|
|
||||||
new HTable(conf, HConstants.META_TABLE_NAME);
|
|
||||||
this.server = cluster.getRegionServer(0);
|
|
||||||
this.log = server.getLog();
|
|
||||||
|
|
||||||
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
|
|
||||||
// don't run this test without append support (HDFS-200 & HDFS-142)
|
|
||||||
assertTrue("Need append support for this test", FSUtils.isAppendSupported(conf));
|
|
||||||
|
|
||||||
// add up the datanode count, to ensure proper replication when we kill 1
|
|
||||||
dfsCluster.startDataNodes(conf, 1, true, null, null);
|
|
||||||
dfsCluster.waitActive();
|
|
||||||
assertTrue(dfsCluster.getDataNodes().size() >=
|
|
||||||
fs.getDefaultReplication() + 1);
|
|
||||||
|
|
||||||
// Create the test table and open it
|
|
||||||
String tableName = getName();
|
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
|
||||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
|
||||||
admin.createTable(desc);
|
|
||||||
HTable table = new HTable(conf, tableName);
|
|
||||||
table.setAutoFlush(true);
|
|
||||||
|
|
||||||
long curTime = System.currentTimeMillis();
|
|
||||||
long oldFilenum = log.getFilenum();
|
|
||||||
assertTrue("Log should have a timestamp older than now",
|
|
||||||
curTime > oldFilenum && oldFilenum != -1);
|
|
||||||
|
|
||||||
// normal write
|
|
||||||
writeData(table, 1);
|
|
||||||
assertTrue("The log shouldn't have rolled yet",
|
|
||||||
oldFilenum == log.getFilenum());
|
|
||||||
|
|
||||||
// kill a datanode in the pipeline to force a log roll on the next sync()
|
// kill a datanode in the pipeline to force a log roll on the next sync()
|
||||||
OutputStream stm = log.getOutputStream();
|
OutputStream stm = log.getOutputStream();
|
||||||
Method getPipeline = null;
|
Method getPipeline = null;
|
||||||
for (Method m : stm.getClass().getDeclaredMethods()) {
|
for (Method m : stm.getClass().getDeclaredMethods()) {
|
||||||
if(m.getName().endsWith("getPipeline")) {
|
if (m.getName().endsWith("getPipeline")) {
|
||||||
getPipeline = m;
|
getPipeline = m;
|
||||||
getPipeline.setAccessible(true);
|
getPipeline.setAccessible(true);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertTrue("Need DFSOutputStream.getPipeline() for this test",
|
|
||||||
getPipeline != null);
|
|
||||||
Object repl = getPipeline.invoke(stm, new Object []{} /*NO_ARGS*/);
|
|
||||||
DatanodeInfo[] pipeline = (DatanodeInfo[]) repl;
|
|
||||||
assertTrue(pipeline.length == fs.getDefaultReplication());
|
|
||||||
DataNodeProperties dnprop = dfsCluster.stopDataNode(pipeline[0].getName());
|
|
||||||
assertTrue(dnprop != null);
|
|
||||||
|
|
||||||
|
assertTrue("Need DFSOutputStream.getPipeline() for this test",
|
||||||
|
null != getPipeline);
|
||||||
|
Object repl = getPipeline.invoke(stm, new Object[] {} /* NO_ARGS */);
|
||||||
|
return (DatanodeInfo[]) repl;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that logs are rolled upon detecting datanode death
|
||||||
|
* Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws InvocationTargetException
|
||||||
|
* @throws IllegalAccessException
|
||||||
|
* @throws IllegalArgumentException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testLogRollOnDatanodeDeath() throws IOException,
|
||||||
|
InterruptedException, IllegalArgumentException, IllegalAccessException,
|
||||||
|
InvocationTargetException {
|
||||||
|
assertTrue("This test requires HLog file replication.", fs
|
||||||
|
.getDefaultReplication() > 1);
|
||||||
|
// When the META table can be opened, the region servers are running
|
||||||
|
new HTable(TEST_UTIL.getConfiguration(), HConstants.META_TABLE_NAME);
|
||||||
|
this.server = cluster.getRegionServer(0);
|
||||||
|
this.log = server.getLog();
|
||||||
|
|
||||||
|
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
|
||||||
|
// don't run this test without append support (HDFS-200 & HDFS-142)
|
||||||
|
assertTrue("Need append support for this test", FSUtils
|
||||||
|
.isAppendSupported(TEST_UTIL.getConfiguration()));
|
||||||
|
|
||||||
|
// add up the datanode count, to ensure proper replication when we kill 1
|
||||||
|
dfsCluster
|
||||||
|
.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null);
|
||||||
|
dfsCluster.waitActive();
|
||||||
|
assertTrue(dfsCluster.getDataNodes().size() >= fs.getDefaultReplication() + 1);
|
||||||
|
|
||||||
|
// Create the test table and open it
|
||||||
|
String tableName = getName();
|
||||||
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
|
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||||
|
|
||||||
|
if (admin.tableExists(tableName)) {
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
admin.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
admin.createTable(desc);
|
||||||
|
|
||||||
|
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||||
|
writeData(table, 2);
|
||||||
|
|
||||||
|
table.setAutoFlush(true);
|
||||||
|
|
||||||
|
long curTime = System.currentTimeMillis();
|
||||||
|
long oldFilenum = log.getFilenum();
|
||||||
|
assertTrue("Log should have a timestamp older than now",
|
||||||
|
curTime > oldFilenum && oldFilenum != -1);
|
||||||
|
|
||||||
|
assertTrue("The log shouldn't have rolled yet", oldFilenum == log
|
||||||
|
.getFilenum());
|
||||||
|
DatanodeInfo[] pipeline = getPipeline(log);
|
||||||
|
assertTrue(pipeline.length == fs.getDefaultReplication());
|
||||||
|
|
||||||
|
assertTrue(dfsCluster.stopDataNode(pipeline[0].getName()) != null);
|
||||||
|
Thread.sleep(10000);
|
||||||
// this write should succeed, but trigger a log roll
|
// this write should succeed, but trigger a log roll
|
||||||
writeData(table, 2);
|
writeData(table, 2);
|
||||||
long newFilenum = log.getFilenum();
|
long newFilenum = log.getFilenum();
|
||||||
|
|
||||||
assertTrue("Missing datanode should've triggered a log roll",
|
assertTrue("Missing datanode should've triggered a log roll",
|
||||||
newFilenum > oldFilenum && newFilenum > curTime);
|
newFilenum > oldFilenum && newFilenum > curTime);
|
||||||
|
|
||||||
// write some more log data (this should use a new hdfs_out)
|
// write some more log data (this should use a new hdfs_out)
|
||||||
writeData(table, 3);
|
writeData(table, 3);
|
||||||
assertTrue("The log should not roll again.",
|
assertTrue("The log should not roll again.", log.getFilenum() == newFilenum);
|
||||||
log.getFilenum() == newFilenum);
|
assertTrue("New log file should have the default replication", log
|
||||||
assertTrue("New log file should have the default replication",
|
.getLogReplication() == fs.getDefaultReplication());
|
||||||
log.getLogReplication() == fs.getDefaultReplication());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue