HBASE-2551 Forward port fixes that are in branch but not in trunk (part of the merge of old 0.20 into TRUNK task) -- part 1

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@944510 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-05-14 22:35:48 +00:00
parent 6448146e63
commit 8e95593f42
5 changed files with 189 additions and 56 deletions

View File

@ -311,6 +311,13 @@ Release 0.21.0 - Unreleased
HBASE-2544 Forward port branch 0.20 WAL to TRUNK HBASE-2544 Forward port branch 0.20 WAL to TRUNK
HBASE-2546 Specify default filesystem in both the new and old way (needed HBASE-2546 Specify default filesystem in both the new and old way (needed
if we are to run on 0.20 and 0.21 hadoop) if we are to run on 0.20 and 0.21 hadoop)
HBASE-1895 HConstants.MAX_ROW_LENGTH is incorrectly 64k, should be 32k
HBASE-1968 Give clients access to the write buffer
HBASE-2028 Add HTable.incrementColumnValue support to shell
(Lars George via Andrew Purtell)
HBASE-2138 unknown metrics type
HBASE-2551 Forward port fixes that are in branch but not in trunk (part of
the merge of old 0.20 into TRUNK task) -- part 1.
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -19,60 +19,57 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.NavigableSet;
import java.util.Set; import java.util.Random;
import java.util.NavigableSet; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.HashMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.HashSet; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Random; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.Reference.Range;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
/** /**
* HRegion stores data for a certain region of a table. It stores all columns * HRegion stores data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more HRegions. * for each row. A given table consists of one or more HRegions.
@ -110,7 +107,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* defines the keyspace for this HRegion. * defines the keyspace for this HRegion.
*/ */
public class HRegion implements HConstants, HeapSize { // , Writable{ public class HRegion implements HConstants, HeapSize { // , Writable{
static final Log LOG = LogFactory.getLog(HRegion.class); public static final Log LOG = LogFactory.getLog(HRegion.class);
static final String SPLITDIR = "splits"; static final String SPLITDIR = "splits";
static final String MERGEDIR = "merges"; static final String MERGEDIR = "merges";
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);

View File

@ -115,7 +115,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/ */
public class HRegionServer implements HConstants, HRegionInterface, public class HRegionServer implements HConstants, HRegionInterface,
HBaseRPCErrorHandler, Runnable, Watcher { HBaseRPCErrorHandler, Runnable, Watcher {
static final Log LOG = LogFactory.getLog(HRegionServer.class); public static final Log LOG = LogFactory.getLog(HRegionServer.class);
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};

View File

@ -348,6 +348,11 @@ public class HLog implements HConstants, Syncable {
return logSeqNum.get(); return logSeqNum.get();
} }
// usage: see TestLogRolling.java
OutputStream getOutputStream() {
return this.hdfs_out;
}
/** /**
* Roll the log writer. That is, start writing log messages to a new file. * Roll the log writer. That is, start writing log messages to a new file.
* *

View File

@ -19,11 +19,14 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -35,6 +38,13 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.log4j.Level;
/** /**
* Test log deletion as logs are rolled. * Test log deletion as logs are rolled.
@ -46,6 +56,17 @@ public class TestLogRolling extends HBaseClusterTestCase {
private String tableName; private String tableName;
private byte[] value; private byte[] value;
// verbose logging on classes that are touched in these tests
{
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL);
}
/** /**
* constructor * constructor
* @throws Exception * @throws Exception
@ -73,9 +94,10 @@ public class TestLogRolling extends HBaseClusterTestCase {
} }
// Need to override this setup so we can edit the config before it gets sent // Need to override this setup so we can edit the config before it gets sent
// to the cluster startup. // to the HDFS & HBase cluster startup.
@Override @Override
protected void preHBaseClusterSetup() { protected void setUp() throws Exception {
/**** configuration for testLogRolling ****/
// Force a region split after every 768KB // Force a region split after every 768KB
conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
@ -94,6 +116,19 @@ public class TestLogRolling extends HBaseClusterTestCase {
// Reduce thread wake frequency so that other threads can get // Reduce thread wake frequency so that other threads can get
// a chance to run. // a chance to run.
conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); conf.setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000);
/**** configuration for testLogRollOnDatanodeDeath ****/
// make sure log.hflush() calls syncFs() to open a pipeline
conf.setBoolean("dfs.support.append", true);
// lower the namenode & datanode heartbeat so the namenode
// quickly detects datanode failures
conf.setInt("heartbeat.recheck.interval", 5000);
conf.setInt("dfs.heartbeat.interval", 1);
// the namenode might still try to choose the recently-dead datanode
// for a pipeline, so try to a new pipeline multiple times
conf.setInt("dfs.client.block.write.retries", 30);
super.setUp();
} }
private void startAndWriteData() throws Exception { private void startAndWriteData() throws Exception {
@ -154,4 +189,93 @@ public class TestLogRolling extends HBaseClusterTestCase {
throw e; throw e;
} }
} }
void writeData(HTable table, int rownum) throws Exception {
Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", rownum)));
put.add(HConstants.CATALOG_FAMILY, null, value);
table.put(put);
// sleep to let the log roller run (if it needs to)
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// continue
}
}
/**
* Tests that logs are rolled upon detecting datanode death
* Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200)
*
* @throws Exception
*/
public void testLogRollOnDatanodeDeath() throws Exception {
assertTrue("This test requires HLog file replication.",
fs.getDefaultReplication() > 1);
// When the META table can be opened, the region servers are running
new HTable(conf, HConstants.META_TABLE_NAME);
this.server = cluster.getRegionServer(0);
this.log = server.getLog();
assertTrue("Need HDFS-826 for this test", log.canGetCurReplicas());
// don't run this test without append support (HDFS-200 & HDFS-142)
assertTrue("Need append support for this test", HLog.isAppend(conf));
// add up the datanode count, to ensure proper replication when we kill 1
dfsCluster.startDataNodes(conf, 1, true, null, null);
dfsCluster.waitActive();
assertTrue(dfsCluster.getDataNodes().size() >=
fs.getDefaultReplication() + 1);
// Create the test table and open it
String tableName = getName();
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
HTable table = new HTable(conf, tableName);
table.setAutoFlush(true);
long curTime = System.currentTimeMillis();
long oldFilenum = log.getFilenum();
assertTrue("Log should have a timestamp older than now",
curTime > oldFilenum && oldFilenum != -1);
// normal write
writeData(table, 1);
assertTrue("The log shouldn't have rolled yet",
oldFilenum == log.getFilenum());
// kill a datanode in the pipeline to force a log roll on the next sync()
OutputStream stm = log.getOutputStream();
Method getPipeline = null;
for (Method m : stm.getClass().getDeclaredMethods()) {
if(m.getName().endsWith("getPipeline")) {
getPipeline = m;
getPipeline.setAccessible(true);
break;
}
}
assertTrue("Need DFSOutputStream.getPipeline() for this test",
getPipeline != null);
Object repl = getPipeline.invoke(stm, new Object []{} /*NO_ARGS*/);
DatanodeInfo[] pipeline = (DatanodeInfo[]) repl;
assertTrue(pipeline.length == fs.getDefaultReplication());
DataNodeProperties dnprop = dfsCluster.stopDataNode(pipeline[0].getName());
assertTrue(dnprop != null);
// this write should succeed, but trigger a log roll
writeData(table, 2);
long newFilenum = log.getFilenum();
assertTrue("Missing datanode should've triggered a log roll",
newFilenum > oldFilenum && newFilenum > curTime);
// write some more log data (this should use a new hdfs_out)
writeData(table, 3);
assertTrue("The log should not roll again.",
log.getFilenum() == newFilenum);
assertTrue("New log file should have the default replication",
log.getLogReplication() == fs.getDefaultReplication());
}
} }