HBASE-6197 HRegion's append operation may lose data (Xing Shi)

Submitted by:	Xing Shi
Reviewed by:	Ted Yu


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1350099 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-14 06:09:33 +00:00
parent 0fdf8126c5
commit 12a1613508
2 changed files with 113 additions and 8 deletions

View File

@ -4529,8 +4529,8 @@ public class HRegion implements HeapSize { // , Writable{
boolean flush = false;
WALEdit walEdits = null;
List<KeyValue> allKVs = new ArrayList<KeyValue>(append.size());
List<KeyValue> kvs = new ArrayList<KeyValue>(append.size());
long now = EnvironmentEdgeManager.currentTimeMillis();
Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
long before = EnvironmentEdgeManager.currentTimeMillis();
long size = 0;
long txid = 0;
@ -4541,11 +4541,13 @@ public class HRegion implements HeapSize { // , Writable{
Integer lid = getLock(lockid, row, true);
this.updatesLock.readLock().lock();
try {
long now = EnvironmentEdgeManager.currentTimeMillis();
// Process each family
for (Map.Entry<byte[], List<KeyValue>> family : append.getFamilyMap()
.entrySet()) {
Store store = stores.get(family.getKey());
List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
// Get previous values for all columns in this family
Get get = new Get(row);
@ -4611,10 +4613,8 @@ public class HRegion implements HeapSize { // , Writable{
}
}
// Write the KVs for this family into the store
size += store.upsert(kvs);
allKVs.addAll(kvs);
kvs.clear();
//store the kvs to the temporary memstore before writing HLog
tempMemstore.put(store, kvs);
}
// Actually write to WAL now
@ -4624,9 +4624,16 @@ public class HRegion implements HeapSize { // , Writable{
// as a Put.
txid = this.log.appendNoSync(regionInfo,
this.htableDescriptor.getName(), walEdits,
HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
}
//Actually write to Memstore now
for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet()) {
Store store = entry.getKey();
size += store.upsert(entry.getValue());
allKVs.addAll(entry.getValue());
}
size = this.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
@ -4641,7 +4648,7 @@ public class HRegion implements HeapSize { // , Writable{
}
long after = EnvironmentEdgeManager.currentTimeMillis();
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - now);
this.opMetrics.updateAppendMetrics(append.getFamilyMap().keySet(), after - before);
if (flush) {

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -3611,6 +3612,103 @@ public class TestHRegion extends HBaseTestCase {
this.region = null;
}
/**
* TestCase for append
*
*/
private static class Appender implements Runnable {
private HRegion region;
private final static byte[] appendRow = Bytes.toBytes("appendRow");
private final static byte[] family = Bytes.toBytes("family");
private final static byte[] qualifier = Bytes.toBytes("qualifier");
private final static byte[] CHAR = Bytes.toBytes("a");
private int appendCounter;
public Appender(HRegion region, int appendCounter) {
this.region = region;
this.appendCounter = appendCounter;
}
@Override
public void run() {
int count = 0;
while (count < appendCounter) {
Append app = new Append(appendRow);
app.add(family, qualifier, CHAR);
count++;
try {
region.append(app, null, true);
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}
}
/**
* Test case to check append function with memstore flushing
* @throws Exception
*/
@Test
public void testParallelAppendWithMemStoreFlush() throws Exception {
Configuration conf = HBaseConfiguration.create();
String method = "testParallelAppendWithMemStoreFlush";
byte[] tableName = Bytes.toBytes(method);
byte[] family = Appender.family;
this.region = initHRegion(tableName, method, conf, family);
final HRegion region = this.region;
final AtomicBoolean appendDone = new AtomicBoolean(false);
Runnable flusher = new Runnable() {
@Override
public void run() {
while (!appendDone.get()) {
try {
region.flushcache();
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
//after all append finished, the value will append to threadNum * appendCounter Appender.CHAR
int threadNum = 20;
int appendCounter = 100;
byte[] expected = new byte[threadNum * appendCounter];
for (int i = 0; i < threadNum * appendCounter; i++) {
System.arraycopy(Appender.CHAR, 0, expected, i, 1);
}
Thread[] appenders = new Thread[threadNum];
Thread flushThread = new Thread(flusher);
for (int i = 0; i < threadNum; i++) {
appenders[i] = new Thread(new Appender(this.region, appendCounter));
appenders[i].start();
}
flushThread.start();
for (int i = 0; i < threadNum; i++) {
appenders[i].join();
}
appendDone.set(true);
flushThread.join();
Get get = new Get(Appender.appendRow);
get.addColumn(Appender.family, Appender.qualifier);
get.setMaxVersions(1);
Result res = this.region.get(get, null);
List<KeyValue> kvs = res.getColumn(Appender.family,
Appender.qualifier);
//we just got the latest version
assertEquals(kvs.size(), 1);
KeyValue kv = kvs.get(0);
byte[] appendResult = new byte[kv.getValueLength()];
System.arraycopy(kv.getBuffer(), kv.getValueOffset(), appendResult, 0, kv.getValueLength());
assertEquals(expected, appendResult);
this.region = null;
}
private void putData(int startRow, int numRows, byte [] qf,
byte [] ...families)
throws IOException {