HBASE-2283 row level atomicity

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@925508 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-03-20 00:52:51 +00:00
parent e9cc906a81
commit f106722774
16 changed files with 311 additions and 226 deletions

View File

@ -247,6 +247,7 @@ Release 0.21.0 - Unreleased
HBASE-2334 Slimming of Maven dependency tree - improves assembly build
speed (Paul Smith via Stack)
HBASE-2336 Fix build broken with HBASE-2334 (Lars Francke via Lars George)
HBASE-2283 row level atomicity (Kannan Muthukkaruppan via Stack)
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -13,7 +13,10 @@
</parent>
<modules>
<module>mdc_replication</module>
<!-- Commenting out for the moment. This is being rewritten. Will then pick up new WALEdit hlog value
<module>mdc_replication</module>
St.Ack Fri Mar 19 13:20:15 PDT 2010
-->
<module>stargate</module>
<module>transactional</module>
</modules>

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/**
* Add support for transactional operations to the regionserver's
@ -84,16 +85,17 @@ class THLog extends HLog {
*/
public void append(HRegionInfo regionInfo, long now, THLogKey.TrxOp txOp,
long transactionId) throws IOException {
THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo
.getTableDesc().getName(), -1, now, txOp, transactionId);
super.append(regionInfo, key, new KeyValue(new byte [0], 0, 0)); // Empty KeyValue
THLogKey key = new THLogKey(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), -1, now, txOp, transactionId);
WALEdit e = new WALEdit();
e.add(new KeyValue(new byte [0], 0, 0)); // Empty KeyValue
super.append(regionInfo, key, e, regionInfo.isMetaRegion());
}
/**
* Write a transactional update to the log.
*
* @param regionInfo
* @param now
* @param update
* @param transactionId
* @throws IOException
@ -108,7 +110,9 @@ class THLog extends HLog {
transactionId);
for (KeyValue value : convertToKeyValues(update)) {
super.append(regionInfo, key, value);
WALEdit e = new WALEdit();
e.add(value);
super.append(regionInfo, key, e, regionInfo.isMetaRegion());
}
}
@ -116,8 +120,7 @@ class THLog extends HLog {
* Write a transactional delete to the log.
*
* @param regionInfo
* @param now
* @param update
* @param delete
* @param transactionId
* @throws IOException
*/
@ -131,7 +134,9 @@ class THLog extends HLog {
transactionId);
for (KeyValue value : convertToKeyValues(delete)) {
super.append(regionInfo, key, value);
WALEdit e = new WALEdit();
e.add(value);
super.append(regionInfo, key, e, regionInfo.isMetaRegion());
}
}

View File

@ -37,10 +37,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
@ -86,7 +87,7 @@ class THLogRecoveryManager {
* @throws UnsupportedEncodingException
* @throws IOException
*/
public Map<Long, List<KeyValue>> getCommitsFromLog(
public Map<Long, List<WALEdit>> getCommitsFromLog(
final Path reconstructionLog, final long maxSeqID,
final Progressable reporter) throws UnsupportedEncodingException,
IOException {
@ -102,7 +103,8 @@ class THLogRecoveryManager {
return null;
}
SortedMap<Long, List<KeyValue>> pendingTransactionsById = new TreeMap<Long, List<KeyValue>>();
SortedMap<Long, List<WALEdit>> pendingTransactionsById =
new TreeMap<Long, List<WALEdit>>();
Set<Long> commitedTransactions = new HashSet<Long>();
Set<Long> abortedTransactions = new HashSet<Long>();
@ -115,13 +117,16 @@ class THLogRecoveryManager {
long abortCount = 0;
long commitCount = 0;
// How many edits to apply before we send a progress report.
int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
2000);
HLog.Entry entry;
while ((entry = reader.next()) != null) {
THLogKey key = (THLogKey)entry.getKey();
KeyValue val = entry.getEdit();
WALEdit val = entry.getEdit();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing edit: key: " + key.toString() + " val: "
+ val.toString());
@ -136,18 +141,18 @@ class THLogRecoveryManager {
}
long transactionId = key.getTransactionId();
List<KeyValue> updates = pendingTransactionsById.get(transactionId);
List<WALEdit> updates = pendingTransactionsById.get(transactionId);
switch (key.getTrxOp()) {
case OP:
if (updates == null) {
updates = new ArrayList<KeyValue>();
updates = new ArrayList<WALEdit>();
pendingTransactionsById.put(transactionId, updates);
startCount++;
}
updates.add(val);
val = new KeyValue();
val = new WALEdit();
writeCount++;
break;
@ -209,15 +214,16 @@ class THLogRecoveryManager {
return null;
}
private SortedMap<Long, List<KeyValue>> resolvePendingTransaction(
SortedMap<Long, List<KeyValue>> pendingTransactionsById
private SortedMap<Long, List<WALEdit>> resolvePendingTransaction(
SortedMap<Long, List<WALEdit>> pendingTransactionsById
) {
SortedMap<Long, List<KeyValue>> commitedTransactionsById = new TreeMap<Long, List<KeyValue>>();
SortedMap<Long, List<WALEdit>> commitedTransactionsById =
new TreeMap<Long, List<WALEdit>>();
LOG.info("Region log has " + pendingTransactionsById.size()
+ " unfinished transactions. Going to the transaction log to resolve");
for (Entry<Long, List<KeyValue>> entry : pendingTransactionsById.entrySet()) {
for (Entry<Long, List<WALEdit>> entry : pendingTransactionsById.entrySet()) {
if (entry.getValue().isEmpty()) {
LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes.");
}

View File

@ -54,13 +54,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.*;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.*;
import org.apache.hadoop.util.Progressable;
/**
@ -142,20 +138,23 @@ public class TransactionalRegion extends HRegion {
}
THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this);
Map<Long, List<KeyValue>> commitedTransactionsById = recoveryManager
Map<Long, List<WALEdit>> commitedTransactionsById = recoveryManager
.getCommitsFromLog(oldLogFile, minSeqId, reporter);
if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
LOG.debug("found " + commitedTransactionsById.size()
+ " COMMITED transactions to recover.");
for (Entry<Long, List<KeyValue>> entry : commitedTransactionsById
for (Entry<Long, List<WALEdit>> entry : commitedTransactionsById
.entrySet()) {
LOG.debug("Writing " + entry.getValue().size()
+ " updates for transaction " + entry.getKey());
for (KeyValue b : entry.getValue()) {
Put put = new Put(b.getRow());
put.add(b);
for (WALEdit b : entry.getValue()) {
Put put = null;
for (KeyValue kv: b.getKeyValues()) {
if (put == null) put = new Put(kv.getRow());
put.add(kv);
}
super.put(put, true); // These are walled so they live forever
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -105,7 +106,7 @@ public class TestTHLog extends HBaseTestCase implements
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
Map<Long, List<WALEdit>> commits = logRecoveryMangaer.getCommitsFromLog(
filename, -1, null);
assertNull(commits);
@ -135,7 +136,7 @@ public class TestTHLog extends HBaseTestCase implements
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
Map<Long, List<WALEdit>> commits = logRecoveryMangaer.getCommitsFromLog(
filename, -1, null);
assertNull(commits);
@ -170,7 +171,7 @@ public class TestTHLog extends HBaseTestCase implements
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
Map<Long, List<WALEdit>> commits = logMangaer.getCommitsFromLog(filename,
-1, null);
assertNull(commits);
@ -205,7 +206,7 @@ public class TestTHLog extends HBaseTestCase implements
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
Map<Long, List<WALEdit>> commits = logMangaer.getCommitsFromLog(filename,
-1, null);
assertNull(commits);
@ -240,7 +241,7 @@ public class TestTHLog extends HBaseTestCase implements
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
Map<Long, List<WALEdit>> commits = logMangaer.getCommitsFromLog(filename,
-1, null);
assertNull(commits);

View File

@ -1777,14 +1777,22 @@ public class KeyValue implements Writable, HeapSize {
(2 * Bytes.SIZEOF_INT));
}
// Writable
public void readFields(final DataInput in) throws IOException {
this.length = in.readInt();
// this overload assumes that the length bytes have already been read,
// and it expects the length of the KeyValue to be explicitly passed
// to it.
public void readFields(int length, final DataInput in) throws IOException {
this.length = length;
this.offset = 0;
this.bytes = new byte[this.length];
in.readFully(this.bytes, 0, this.length);
}
// Writable
public void readFields(final DataInput in) throws IOException {
int length = in.readInt();
readFields(length, in);
}
public void write(final DataOutput out) throws IOException {
out.writeInt(this.length);
out.write(this.bytes, this.offset, this.length);

View File

@ -45,6 +45,7 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@ -58,6 +59,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.NavigableSet;
import java.util.TreeMap;
@ -68,7 +70,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegion stores data for a certain region of a table. It stores all columns
@ -986,7 +988,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(getRegionName(),
regionInfo.getTableDesc().getName(), completeSequenceId);
regionInfo.getTableDesc().getName(), completeSequenceId,
this.getRegionInfo().isMetaRegion());
// C. Finally notify anyone waiting on memstore to clear:
// e.g. checkResources().
@ -1141,10 +1144,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
}
}
for(Map.Entry<byte[], List<KeyValue>> e: delete.getFamilyMap().entrySet()) {
byte [] family = e.getKey();
delete(family, e.getValue(), writeToWAL);
}
// All edits for the given row (across all column families) must happen atomically.
delete(delete.getFamilyMap(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
splitsAndClosesLock.readLock().unlock();
@ -1153,12 +1155,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
/**
* @param family
* @param kvs
* @param familyMap map of family to edits for the given family.
* @param writeToWAL
* @throws IOException
*/
public void delete(byte [] family, List<KeyValue> kvs, boolean writeToWAL)
public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
throws IOException {
long now = System.currentTimeMillis();
byte [] byteNow = Bytes.toBytes(now);
@ -1166,46 +1167,69 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
this.updatesLock.readLock().lock();
try {
long size = 0;
Store store = getStore(family);
Iterator<KeyValue> kvIterator = kvs.iterator();
while(kvIterator.hasNext()) {
KeyValue kv = kvIterator.next();
// Check if time is LATEST, change to time of most recent addition if so
// This is expensive.
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
List<KeyValue> result = new ArrayList<KeyValue>(1);
Get g = new Get(kv.getRow());
NavigableSet<byte []> qualifiers =
new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
byte [] q = kv.getQualifier();
if(q == null) q = HConstants.EMPTY_BYTE_ARRAY;
qualifiers.add(q);
get(store, g, qualifiers, result);
if (result.isEmpty()) {
// Nothing to delete
kvIterator.remove();
continue;
}
if (result.size() > 1) {
throw new RuntimeException("Unexpected size: " + result.size());
}
KeyValue getkv = result.get(0);
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else {
kv.updateLatestStamp(byteNow);
}
if (writeToWAL) {
//
// write/sync to WAL should happen before we touch memstore.
//
// If order is reversed, i.e. we write to memstore first, and
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
//
// We must do this in this loop because it could affect
// the above get to find the next timestamp to remove.
// This is the case when there are multiple deletes for the same column.
size = this.memstoreSize.addAndGet(store.delete(kv));
// bunch up all edits across all column families into a
// single WALEdit.
WALEdit walEdit = new WALEdit();
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
List<KeyValue> kvs = e.getValue();
for (KeyValue kv : kvs) {
walEdit.add(kv);
}
}
// append the edit to WAL. The append also does the sync.
if (!walEdit.isEmpty()) {
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
}
}
if (writeToWAL) {
this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), kvs, now);
long size = 0;
//
// Now make changes to the memstore.
//
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> kvs = e.getValue();
Store store = getStore(family);
for (KeyValue kv: kvs) {
// Check if time is LATEST, change to time of most recent addition if so
// This is expensive.
if (kv.isLatestTimestamp() && kv.isDeleteType()) {
List<KeyValue> result = new ArrayList<KeyValue>(1);
Get g = new Get(kv.getRow());
NavigableSet<byte []> qualifiers =
new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
byte [] q = kv.getQualifier();
if(q == null) q = HConstants.EMPTY_BYTE_ARRAY;
qualifiers.add(q);
get(store, g, qualifiers, result);
if (result.isEmpty()) {
// Nothing to delete
continue;
}
if (result.size() > 1) {
throw new RuntimeException("Unexpected size: " + result.size());
}
KeyValue getkv = result.get(0);
Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(),
getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG);
} else {
kv.updateLatestStamp(byteNow);
}
size = this.memstoreSize.addAndGet(store.delete(kv));
}
}
flush = isFlushSize(size);
} finally {
@ -1270,15 +1294,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
Integer lid = getLock(lockid, row);
byte [] now = Bytes.toBytes(System.currentTimeMillis());
try {
for (Map.Entry<byte[], List<KeyValue>> entry:
put.getFamilyMap().entrySet()) {
byte [] family = entry.getKey();
checkFamily(family);
List<KeyValue> puts = entry.getValue();
if (updateKeys(puts, now)) {
put(family, puts, writeToWAL);
}
}
// All edits for the given row (across all column families) must happen atomically.
put(put.getFamilyMap(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -1337,16 +1354,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
matches = Bytes.equals(expectedValue, actualValue);
}
//If matches put the new put
if(matches) {
for(Map.Entry<byte[], List<KeyValue>> entry :
put.getFamilyMap().entrySet()) {
byte [] fam = entry.getKey();
checkFamily(fam);
List<KeyValue> puts = entry.getValue();
if(updateKeys(puts, now)) {
put(fam, puts, writeToWAL);
}
}
if (matches) {
// All edits for the given row (across all column families) must happen atomically.
put(put.getFamilyMap(), writeToWAL);
return true;
}
return false;
@ -1456,34 +1466,74 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
*/
private void put(final byte [] family, final List<KeyValue> edits)
throws IOException {
this.put(family, edits, true);
Map<byte[], List<KeyValue>> familyMap = new HashMap<byte[], List<KeyValue>>();
familyMap.put(family, edits);
this.put(familyMap, true);
}
/**
* Add updates first to the hlog (if writeToWal) and then add values to memstore.
* Warning: Assumption is caller has lock on passed in row.
* @param family
* @param edits
* @param familyMap map of family to edits for the given family.
* @param writeToWAL if true, then we should write to the log
* @throws IOException
*/
private void put(final byte [] family, final List<KeyValue> edits,
boolean writeToWAL) throws IOException {
if (edits == null || edits.isEmpty()) {
return;
}
private void put(final Map<byte [], List<KeyValue>> familyMap,
boolean writeToWAL)
throws IOException {
long now = System.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
this.updatesLock.readLock().lock();
try {
if (writeToWAL) {
long now = System.currentTimeMillis();
this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), edits, now);
WALEdit walEdit = new WALEdit();
// check if column families are valid;
// check if any timestampupdates are needed;
// and if writeToWAL is set, then also collapse edits into a single list.
for (Map.Entry<byte[], List<KeyValue>> e: familyMap.entrySet()) {
List<KeyValue> edits = e.getValue();
byte[] family = e.getKey();
// is this a valid column family?
checkFamily(family);
// update timestamp on keys if required.
if (updateKeys(edits, byteNow)) {
if (writeToWAL) {
// bunch up all edits across all column families into a
// single WALEdit.
for (KeyValue kv : edits) {
walEdit.add(kv);
}
}
}
}
// append to and sync WAL
if (!walEdit.isEmpty()) {
//
// write/sync to WAL should happen before we touch memstore.
//
// If order is reversed, i.e. we write to memstore first, and
// for some reason fail to write/sync to commit log, the memstore
// will contain uncommitted transactions.
//
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
}
long size = 0;
Store store = getStore(family);
for (KeyValue kv: edits) {
size = this.memstoreSize.addAndGet(store.add(kv));
// now make changes to the memstore
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
byte[] family = e.getKey();
List<KeyValue> edits = e.getValue();
Store store = getStore(family);
for (KeyValue kv: edits) {
size = this.memstoreSize.addAndGet(store.add(kv));
}
}
flush = isFlushSize(size);
} finally {
@ -2402,10 +2452,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
// now log it:
if (writeToWAL) {
long now = System.currentTimeMillis();
List<KeyValue> edits = new ArrayList<KeyValue>(1);
edits.add(newKv);
this.log.append(regionInfo,
regionInfo.getTableDesc().getName(), edits, now);
WALEdit walEdit = new WALEdit();
walEdit.add(newKv);
this.log.append(regionInfo, regionInfo.getTableDesc().getName(),
walEdit, now);
}
// Now request the ICV to the store, this will set the timestamp
@ -2460,7 +2510,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{
(5 * Bytes.SIZEOF_BOOLEAN)) +
(3 * ClassSize.REENTRANT_LOCK));
@Override
public long heapSize() {
long heapSize = DEEP_OVERHEAD;
for(Store store : this.stores.values()) {

View File

@ -1686,10 +1686,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
boolean writeToWAL = put.getWriteToWAL();
region.put(put, getLockFromId(put.getLockId()), writeToWAL);
if (writeToWAL) {
this.syncWal(region);
}
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
@ -1723,11 +1719,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
// All have been processed successfully.
if (writeToWAL) {
this.syncWal(region);
}
return -1;
}
@ -1758,7 +1749,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
boolean retval = region.checkAndPut(row, family, qualifier, value, put,
getLockFromId(put.getLockId()), true);
this.syncWal(region);
return retval;
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
@ -1912,7 +1902,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
Integer lid = getLockFromId(delete.getLockId());
region.delete(delete, lid, writeToWAL);
this.syncWal(region);
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
@ -1944,8 +1933,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
this.syncWal(region);
return -1;
}
@ -2382,14 +2369,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
long retval = region.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
if (writeToWAL) {
syncWal(region);
}
return retval;
return region.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
} catch (IOException e) {
checkFileSystem();
throw e;
@ -2411,13 +2392,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
return serverInfo;
}
// Sync the WAL if the table permits it
private void syncWal(HRegion region) {
if(!region.getTableDesc().isDeferredLogFlush()) {
this.hlog.sync(region.getRegionInfo().isMetaRegion());
}
}
/**
* Interval at which threads should run
* @return the interval

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@ -294,7 +295,7 @@ public class Store implements HConstants, HeapSize {
*
* We can ignore any log message that has a sequence ID that's equal to or
* lower than maxSeqID. (Because we know such log messages are already
* reflected in the MapFiles.)
* reflected in the HFiles.)
*
* @return the new max sequence id as per the log, or -1 if no log recovered
*/
@ -324,9 +325,17 @@ public class Store implements HConstants, HeapSize {
int reportInterval =
this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
HLog.Entry entry;
// TBD: Need to add an exception handler around logReader.next.
//
// A transaction now appears as a single edit. If logReader.next()
// returns an exception, then it must be a incomplete/partial
// transaction at the end of the file. Rather than bubble up
// the exception, we should catch it and simply ignore the
// partial transaction during this recovery phase.
//
while ((entry = logReader.next()) != null) {
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
WALEdit val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
@ -335,27 +344,28 @@ public class Store implements HConstants, HeapSize {
skippedEdits++;
continue;
}
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (val.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
!val.matchingFamily(family.getName())) {
continue;
}
if (val.isDelete()) {
this.memstore.delete(val);
} else {
this.memstore.add(val);
}
editsCount++;
for (KeyValue kv : val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
!kv.matchingFamily(family.getName())) {
continue;
}
if (kv.isDelete()) {
this.memstore.delete(kv);
} else {
this.memstore.add(kv);
}
editsCount++;
}
// Every 2k edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % reportInterval) == 0) {
reporter.progress();
}
// Instantiate a new KeyValue to perform Writable on
val = new KeyValue();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
@ -856,7 +866,6 @@ public class Store implements HConstants, HeapSize {
/**
* Do a minor/major compaction. Uses the scan infrastructure to make it easy.
*
* @param writer output writer
* @param filesToCompact which files to compact
* @param majorCompaction true to major compact (prune all deletes, max versions, etc)
* @param maxId Readers maximum sequence id.

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
@ -425,7 +426,8 @@ public class HLog implements HConstants, Syncable {
*/
@SuppressWarnings("unchecked")
public static Writer createWriter(final FileSystem fs,
final Path path, Configuration conf) throws IOException {
final Path path, Configuration conf)
throws IOException {
try {
Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl",
SequenceFileLogWriter.class.getCanonicalName()));
@ -628,12 +630,13 @@ public class HLog implements HConstants, Syncable {
* @param now Time of this edit write.
* @throws IOException
*/
public void append(HRegionInfo regionInfo, KeyValue logEdit,
final long now)
public void append(HRegionInfo regionInfo, WALEdit logEdit,
final long now,
final boolean isMetaRegion)
throws IOException {
byte [] regionName = regionInfo.getRegionName();
byte [] tableName = regionInfo.getTableDesc().getName();
this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit);
this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion);
}
/**
@ -655,7 +658,8 @@ public class HLog implements HConstants, Syncable {
* @param logKey
* @throws IOException
*/
public void append(HRegionInfo regionInfo, HLogKey logKey, KeyValue logEdit)
public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit,
final boolean isMetaRegion)
throws IOException {
if (this.closed) {
throw new IOException("Cannot append; log is closed");
@ -674,6 +678,10 @@ public class HLog implements HConstants, Syncable {
this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet();
}
// sync txn to file system
this.sync(isMetaRegion);
if (this.editsSize.get() > this.logrollsize) {
if (listener != null) {
listener.logRollRequested();
@ -704,31 +712,31 @@ public class HLog implements HConstants, Syncable {
* @param now
* @throws IOException
*/
public void append(HRegionInfo info, byte [] tableName, List<KeyValue> edits,
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
final long now)
throws IOException {
byte[] regionName = info.getRegionName();
if (this.closed) {
throw new IOException("Cannot append; log is closed");
}
long seqNum [] = obtainSeqNum(edits.size());
long seqNum = obtainSeqNum();
synchronized (this.updateLock) {
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0]));
this.lastSeqWritten.putIfAbsent(regionName, seqNum);
int counter = 0;
for (KeyValue kv: edits) {
HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now);
doWrite(info, logKey, kv);
this.numEntries.incrementAndGet();
}
HLogKey logKey = makeKey(regionName, tableName, seqNum, now);
doWrite(info, logKey, edits);
this.numEntries.incrementAndGet();
// Only count 1 row as an unflushed entry.
this.unflushedEntries.incrementAndGet();
}
// sync txn to file system
this.sync(info.isMetaRegion());
if (this.editsSize.get() > this.logrollsize) {
requestLogRoll();
}
@ -869,7 +877,7 @@ public class HLog implements HConstants, Syncable {
}
}
protected void doWrite(HRegionInfo info, HLogKey logKey, KeyValue logEdit)
protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
throws IOException {
if (!this.enabled) {
return;
@ -931,8 +939,9 @@ public class HLog implements HConstants, Syncable {
* completion of a cache-flush. Otherwise the log-seq-id for the flush will
* not appear in the correct logfile.
*
* @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long)}
* @see #completeCacheFlush(byte[], byte[], long)
* @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)}
* (byte[], byte[], long)}
* @see #completeCacheFlush(byte[], byte[], long, boolean)
* @see #abortCacheFlush()
*/
public long startCacheFlush() {
@ -951,7 +960,8 @@ public class HLog implements HConstants, Syncable {
* @throws IOException
*/
public void completeCacheFlush(final byte [] regionName, final byte [] tableName,
final long logSeqId)
final long logSeqId,
final boolean isMetaRegion)
throws IOException {
try {
if (this.closed) {
@ -959,9 +969,10 @@ public class HLog implements HConstants, Syncable {
}
synchronized (updateLock) {
long now = System.currentTimeMillis();
WALEdit edits = completeCacheFlushLogEdit();
this.writer.append(new HLog.Entry(
makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
completeCacheFlushLogEdit()));
edits));
writeTime += System.currentTimeMillis() - now;
writeOps++;
this.numEntries.incrementAndGet();
@ -970,14 +981,20 @@ public class HLog implements HConstants, Syncable {
this.lastSeqWritten.remove(regionName);
}
}
// sync txn to file system
this.sync(isMetaRegion);
} finally {
this.cacheFlushLock.unlock();
}
}
private KeyValue completeCacheFlushLogEdit() {
return new KeyValue(METAROW, METAFAMILY, null,
private WALEdit completeCacheFlushLogEdit() {
KeyValue kv = new KeyValue(METAROW, METAFAMILY, null,
System.currentTimeMillis(), COMPLETE_CACHE_FLUSH);
WALEdit e = new WALEdit();
e.add(kv);
return e;
}
/**
@ -1278,11 +1295,11 @@ public class HLog implements HConstants, Syncable {
* Only used when splitting logs
*/
public static class Entry implements Writable {
private KeyValue edit;
private WALEdit edit;
private HLogKey key;
public Entry() {
edit = new KeyValue();
edit = new WALEdit();
key = new HLogKey();
}
@ -1291,7 +1308,7 @@ public class HLog implements HConstants, Syncable {
* @param edit log's edit
* @param key log's key
*/
public Entry(HLogKey key, KeyValue edit) {
public Entry(HLogKey key, WALEdit edit) {
super();
this.key = key;
this.edit = edit;
@ -1300,7 +1317,7 @@ public class HLog implements HConstants, Syncable {
* Gets the edit
* @return edit
*/
public KeyValue getEdit() {
public WALEdit getEdit() {
return edit;
}
/**

View File

@ -6,8 +6,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.io.SequenceFile;
public class SequenceFileLogReader implements HLog.Reader {
@ -97,7 +97,7 @@ public class SequenceFileLogReader implements HLog.Reader {
public HLog.Entry next(HLog.Entry reuse) throws IOException {
if (reuse == null) {
HLogKey key = HLog.newKey(conf);
KeyValue val = new KeyValue();
WALEdit val = new WALEdit();
if (reader.next(key, val)) {
return new HLog.Entry(key, val);
}

View File

@ -24,7 +24,7 @@ public class SequenceFileLogWriter implements HLog.Writer {
public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
writer = SequenceFile.createWriter(fs, conf, path,
HLog.getKeyClass(conf), KeyValue.class,
HLog.getKeyClass(conf), WALEdit.class,
fs.getConf().getInt("io.file.buffer.size", 4096),
(short) conf.getInt("hbase.regionserver.hlog.replication",
fs.getDefaultReplication()),

View File

@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@ -374,7 +376,9 @@ public class TestHRegion extends HBaseTestCase {
//testing existing family
byte [] family = fam2;
try {
region.delete(family, kvs, true);
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
deleteMap.put(family, kvs);
region.delete(deleteMap, true);
} catch (Exception e) {
assertTrue("Family " +new String(family)+ " does not exist", false);
}
@ -383,7 +387,9 @@ public class TestHRegion extends HBaseTestCase {
boolean ok = false;
family = fam4;
try {
region.delete(family, kvs, true);
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
deleteMap.put(family, kvs);
region.delete(deleteMap, true);
} catch (Exception e) {
ok = true;
}
@ -605,7 +611,9 @@ public class TestHRegion extends HBaseTestCase {
kvs.add(new KeyValue(row1, fam1, col2, null));
kvs.add(new KeyValue(row1, fam1, col3, null));
region.delete(fam1, kvs, true);
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
deleteMap.put(fam1, kvs);
region.delete(deleteMap, true);
// extract the key values out the memstore:
// This is kinda hacky, but better than nothing...

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.*;
@ -111,24 +112,24 @@ public class TestStoreReconstruction {
final byte[] regionName = info.getRegionName();
// Add 10 000 edits to HLog on the good family
List<KeyValue> edit = new ArrayList<KeyValue>();
for (int j = 0; j < TOTAL_EDITS; j++) {
byte[] qualifier = Bytes.toBytes(Integer.toString(j));
byte[] column = Bytes.toBytes("column:" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifier,
System.currentTimeMillis(), column));
log.append(info, tableName, edit,
System.currentTimeMillis());
edit.clear();
}
// Add a cache flush, shouldn't have any effect
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
// Add an edit to another family, should be skipped.
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
System.currentTimeMillis(), rowName));
log.append(info, tableName, edit,
log.append(info, tableName, edit,
System.currentTimeMillis());
log.sync();

View File

@ -91,7 +91,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
for (int i = 0; i < howmany; i++) {
for (int j = 0; j < howmany; j++) {
List<KeyValue> edit = new ArrayList<KeyValue>();
WALEdit edit = new WALEdit();
byte [] family = Bytes.toBytes("column");
byte [] qualifier = Bytes.toBytes(Integer.toString(j));
byte [] column = Bytes.toBytes("column:" + Integer.toString(j));
@ -142,7 +142,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
null,null, false);
for (int i = 0; i < total; i++) {
List<KeyValue> kvs = new ArrayList<KeyValue>();
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(info, bytes, kvs, System.currentTimeMillis());
}
@ -160,7 +160,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
// Add test that checks to see that an open of a Reader works on a file
// that has had a sync done on it.
for (int i = 0; i < total; i++) {
List<KeyValue> kvs = new ArrayList<KeyValue>();
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(info, bytes, kvs, System.currentTimeMillis());
}
@ -179,7 +179,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
// especially that we return good length on file.
final byte [] value = new byte[1025 * 1024]; // Make a 1M value.
for (int i = 0; i < total; i++) {
List<KeyValue> kvs = new ArrayList<KeyValue>();
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value));
wal.append(info, bytes, kvs, System.currentTimeMillis());
}
@ -236,7 +236,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
HLog.Entry entry = new HLog.Entry();
while((entry = reader.next(entry)) != null) {
HLogKey key = entry.getKey();
KeyValue kv = entry.getEdit();
WALEdit kv = entry.getEdit();
String region = Bytes.toString(key.getRegionName());
// Assert that all edits are for same region.
if (previousRegion != null) {
@ -245,7 +245,6 @@ public class TestHLog extends HBaseTestCase implements HConstants {
assertTrue(seqno < key.getLogSeqNum());
seqno = key.getLogSeqNum();
previousRegion = region;
System.out.println(key + " " + kv);
count++;
}
assertEquals(howmany * howmany, count);
@ -269,44 +268,49 @@ public class TestHLog extends HBaseTestCase implements HConstants {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
long timestamp = System.currentTimeMillis();
List<KeyValue> cols = new ArrayList<KeyValue>();
WALEdit cols = new WALEdit();
for (int i = 0; i < COL_COUNT; i++) {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[] { (byte)(i + '0') }));
}
HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName),
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
row,Bytes.toBytes(Bytes.toString(row) + "1"), false);
final byte [] regionName = info.getRegionName();
log.append(info, tableName, cols, System.currentTimeMillis());
long logSeqId = log.startCacheFlush();
log.completeCacheFlush(regionName, tableName, logSeqId);
log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion());
log.close();
Path filename = log.computeFilename(log.getFilenum());
log = null;
// Now open a reader on the log and assert append worked.
reader = HLog.getReader(fs, filename, conf);
HLog.Entry entry = new HLog.Entry();
for (int i = 0; i < COL_COUNT; i++) {
reader.next(entry);
// Above we added all columns on a single row so we only read one
// entry in the below... thats why we have '1'.
for (int i = 0; i < 1; i++) {
HLog.Entry entry = reader.next(null);
if (entry == null) break;
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
WALEdit val = entry.getEdit();
assertTrue(Bytes.equals(regionName, key.getRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));
assertTrue(Bytes.equals(row, val.getRow()));
assertEquals((byte)(i + '0'), val.getValue()[0]);
KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(row, kv.getRow()));
assertEquals((byte)(i + '0'), kv.getValue()[0]);
System.out.println(key + " " + val);
}
HLog.Entry entry = null;
while ((entry = reader.next(null)) != null) {
HLogKey key = entry.getKey();
KeyValue val = entry.getEdit();
WALEdit val = entry.getEdit();
// Assert only one more row... the meta flushed row.
assertTrue(Bytes.equals(regionName, key.getRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));
assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH,
val.getValue()));
val.getKeyValues().get(0).getValue()));
System.out.println(key + " " + val);
}
} finally {