From f1067227748fb2190d0e728f8e4bf5c206730eb1 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 20 Mar 2010 00:52:51 +0000 Subject: [PATCH] HBASE-2283 row level atomicity git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@925508 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + contrib/pom.xml | 5 +- .../regionserver/transactional/THLog.java | 21 +- .../transactional/THLogRecoveryManager.java | 28 ++- .../transactional/TransactionalRegion.java | 21 +- .../regionserver/transactional/TestTHLog.java | 11 +- .../org/apache/hadoop/hbase/KeyValue.java | 14 +- .../hadoop/hbase/regionserver/HRegion.java | 225 +++++++++++------- .../hbase/regionserver/HRegionServer.java | 30 +-- .../hadoop/hbase/regionserver/Store.java | 45 ++-- .../hadoop/hbase/regionserver/wal/HLog.java | 65 +++-- .../wal/SequenceFileLogReader.java | 4 +- .../wal/SequenceFileLogWriter.java | 2 +- .../hbase/regionserver/TestHRegion.java | 14 +- .../regionserver/TestStoreReconstruction.java | 9 +- .../hbase/regionserver/wal/TestHLog.java | 42 ++-- 16 files changed, 311 insertions(+), 226 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 741b5984335..f6a1e2c187f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -247,6 +247,7 @@ Release 0.21.0 - Unreleased HBASE-2334 Slimming of Maven dependency tree - improves assembly build speed (Paul Smith via Stack) HBASE-2336 Fix build broken with HBASE-2334 (Lars Francke via Lars George) + HBASE-2283 row level atomicity (Kannan Muthukkaruppan via Stack) IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/contrib/pom.xml b/contrib/pom.xml index 617ba935b33..43296270591 100644 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -13,7 +13,10 @@ - mdc_replication + stargate transactional diff --git a/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java b/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java index 38ca4329f49..cddf47e2406 100644 --- a/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java +++ b/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.LogRollListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; /** * Add support for transactional operations to the regionserver's @@ -84,16 +85,17 @@ class THLog extends HLog { */ public void append(HRegionInfo regionInfo, long now, THLogKey.TrxOp txOp, long transactionId) throws IOException { - THLogKey key = new THLogKey(regionInfo.getRegionName(), regionInfo - .getTableDesc().getName(), -1, now, txOp, transactionId); - super.append(regionInfo, key, new KeyValue(new byte [0], 0, 0)); // Empty KeyValue + THLogKey key = new THLogKey(regionInfo.getRegionName(), + regionInfo.getTableDesc().getName(), -1, now, txOp, transactionId); + WALEdit e = new WALEdit(); + e.add(new KeyValue(new byte [0], 0, 0)); // Empty KeyValue + super.append(regionInfo, key, e, regionInfo.isMetaRegion()); } /** * Write a transactional update to the log. * * @param regionInfo - * @param now * @param update * @param transactionId * @throws IOException @@ -108,7 +110,9 @@ class THLog extends HLog { transactionId); for (KeyValue value : convertToKeyValues(update)) { - super.append(regionInfo, key, value); + WALEdit e = new WALEdit(); + e.add(value); + super.append(regionInfo, key, e, regionInfo.isMetaRegion()); } } @@ -116,8 +120,7 @@ class THLog extends HLog { * Write a transactional delete to the log. * * @param regionInfo - * @param now - * @param update + * @param delete * @param transactionId * @throws IOException */ @@ -131,7 +134,9 @@ class THLog extends HLog { transactionId); for (KeyValue value : convertToKeyValues(delete)) { - super.append(regionInfo, key, value); + WALEdit e = new WALEdit(); + e.add(value); + super.append(regionInfo, key, e, regionInfo.isMetaRegion()); } } diff --git a/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java b/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java index 48a0591f76b..485d868e828 100644 --- a/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java +++ b/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java @@ -37,10 +37,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.TransactionLogger; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.Progressable; @@ -86,7 +87,7 @@ class THLogRecoveryManager { * @throws UnsupportedEncodingException * @throws IOException */ - public Map> getCommitsFromLog( + public Map> getCommitsFromLog( final Path reconstructionLog, final long maxSeqID, final Progressable reporter) throws UnsupportedEncodingException, IOException { @@ -102,7 +103,8 @@ class THLogRecoveryManager { return null; } - SortedMap> pendingTransactionsById = new TreeMap>(); + SortedMap> pendingTransactionsById = + new TreeMap>(); Set commitedTransactions = new HashSet(); Set abortedTransactions = new HashSet(); @@ -115,13 +117,16 @@ class THLogRecoveryManager { long abortCount = 0; long commitCount = 0; // How many edits to apply before we send a progress report. + + + int reportInterval = conf.getInt("hbase.hstore.report.interval.edits", 2000); HLog.Entry entry; while ((entry = reader.next()) != null) { THLogKey key = (THLogKey)entry.getKey(); - KeyValue val = entry.getEdit(); + WALEdit val = entry.getEdit(); if (LOG.isTraceEnabled()) { LOG.trace("Processing edit: key: " + key.toString() + " val: " + val.toString()); @@ -136,18 +141,18 @@ class THLogRecoveryManager { } long transactionId = key.getTransactionId(); - List updates = pendingTransactionsById.get(transactionId); + List updates = pendingTransactionsById.get(transactionId); switch (key.getTrxOp()) { case OP: if (updates == null) { - updates = new ArrayList(); + updates = new ArrayList(); pendingTransactionsById.put(transactionId, updates); startCount++; } updates.add(val); - val = new KeyValue(); + val = new WALEdit(); writeCount++; break; @@ -209,15 +214,16 @@ class THLogRecoveryManager { return null; } - private SortedMap> resolvePendingTransaction( - SortedMap> pendingTransactionsById + private SortedMap> resolvePendingTransaction( + SortedMap> pendingTransactionsById ) { - SortedMap> commitedTransactionsById = new TreeMap>(); + SortedMap> commitedTransactionsById = + new TreeMap>(); LOG.info("Region log has " + pendingTransactionsById.size() + " unfinished transactions. Going to the transaction log to resolve"); - for (Entry> entry : pendingTransactionsById.entrySet()) { + for (Entry> entry : pendingTransactionsById.entrySet()) { if (entry.getValue().isEmpty()) { LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes."); } diff --git a/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java b/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java index d0a409d63ca..23ef6213e39 100644 --- a/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java +++ b/contrib/transactional/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java @@ -54,13 +54,9 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; -import org.apache.hadoop.hbase.regionserver.FlushRequester; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.*; import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.*; import org.apache.hadoop.util.Progressable; /** @@ -142,20 +138,23 @@ public class TransactionalRegion extends HRegion { } THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this); - Map> commitedTransactionsById = recoveryManager + Map> commitedTransactionsById = recoveryManager .getCommitsFromLog(oldLogFile, minSeqId, reporter); if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) { LOG.debug("found " + commitedTransactionsById.size() + " COMMITED transactions to recover."); - for (Entry> entry : commitedTransactionsById + for (Entry> entry : commitedTransactionsById .entrySet()) { LOG.debug("Writing " + entry.getValue().size() + " updates for transaction " + entry.getKey()); - for (KeyValue b : entry.getValue()) { - Put put = new Put(b.getRow()); - put.add(b); + for (WALEdit b : entry.getValue()) { + Put put = null; + for (KeyValue kv: b.getKeyValues()) { + if (put == null) put = new Put(kv.getRow()); + put.add(kv); + } super.put(put, true); // These are walled so they live forever } } diff --git a/contrib/transactional/src/test/java/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java b/contrib/transactional/src/test/java/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java index 944bcc7d157..1729aab1284 100644 --- a/contrib/transactional/src/test/java/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java +++ b/contrib/transactional/src/test/java/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -105,7 +106,7 @@ public class TestTHLog extends HBaseTestCase implements log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logRecoveryMangaer.getCommitsFromLog( + Map> commits = logRecoveryMangaer.getCommitsFromLog( filename, -1, null); assertNull(commits); @@ -135,7 +136,7 @@ public class TestTHLog extends HBaseTestCase implements log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logRecoveryMangaer.getCommitsFromLog( + Map> commits = logRecoveryMangaer.getCommitsFromLog( filename, -1, null); assertNull(commits); @@ -170,7 +171,7 @@ public class TestTHLog extends HBaseTestCase implements log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logMangaer.getCommitsFromLog(filename, + Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); assertNull(commits); @@ -205,7 +206,7 @@ public class TestTHLog extends HBaseTestCase implements log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logMangaer.getCommitsFromLog(filename, + Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); assertNull(commits); @@ -240,7 +241,7 @@ public class TestTHLog extends HBaseTestCase implements log.close(); Path filename = log.computeFilename(log.getFilenum()); - Map> commits = logMangaer.getCommitsFromLog(filename, + Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); assertNull(commits); diff --git a/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java index ed6997c7287..be9d11de85d 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/core/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1777,14 +1777,22 @@ public class KeyValue implements Writable, HeapSize { (2 * Bytes.SIZEOF_INT)); } - // Writable - public void readFields(final DataInput in) throws IOException { - this.length = in.readInt(); + // this overload assumes that the length bytes have already been read, + // and it expects the length of the KeyValue to be explicitly passed + // to it. + public void readFields(int length, final DataInput in) throws IOException { + this.length = length; this.offset = 0; this.bytes = new byte[this.length]; in.readFully(this.bytes, 0, this.length); } + // Writable + public void readFields(final DataInput in) throws IOException { + int length = in.readInt(); + readFields(length, in); + } + public void write(final DataOutput out) throws IOException { out.writeInt(this.length); out.write(this.bytes, this.offset, this.length); diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d5abae77cd7..68a247b171a 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -45,6 +45,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.wal.HLog; + import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -58,6 +59,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.Iterator; import java.util.List; import java.util.Map; + import java.util.HashMap; import java.util.Set; import java.util.NavigableSet; import java.util.TreeMap; @@ -68,7 +70,7 @@ package org.apache.hadoop.hbase.regionserver; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * HRegion stores data for a certain region of a table. It stores all columns @@ -986,7 +988,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. this.log.completeCacheFlush(getRegionName(), - regionInfo.getTableDesc().getName(), completeSequenceId); + regionInfo.getTableDesc().getName(), completeSequenceId, + this.getRegionInfo().isMetaRegion()); // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -1140,11 +1143,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ checkFamily(family); } } - - for(Map.Entry> e: delete.getFamilyMap().entrySet()) { - byte [] family = e.getKey(); - delete(family, e.getValue(), writeToWAL); - } + + // All edits for the given row (across all column families) must happen atomically. + delete(delete.getFamilyMap(), writeToWAL); + } finally { if(lockid == null) releaseRowLock(lid); splitsAndClosesLock.readLock().unlock(); @@ -1153,12 +1155,11 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ /** - * @param family - * @param kvs + * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(byte [] family, List kvs, boolean writeToWAL) + public void delete(Map> familyMap, boolean writeToWAL) throws IOException { long now = System.currentTimeMillis(); byte [] byteNow = Bytes.toBytes(now); @@ -1166,46 +1167,69 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ this.updatesLock.readLock().lock(); try { - long size = 0; - Store store = getStore(family); - Iterator kvIterator = kvs.iterator(); - while(kvIterator.hasNext()) { - KeyValue kv = kvIterator.next(); - // Check if time is LATEST, change to time of most recent addition if so - // This is expensive. - if (kv.isLatestTimestamp() && kv.isDeleteType()) { - List result = new ArrayList(1); - Get g = new Get(kv.getRow()); - NavigableSet qualifiers = - new TreeSet(Bytes.BYTES_COMPARATOR); - byte [] q = kv.getQualifier(); - if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; - qualifiers.add(q); - get(store, g, qualifiers, result); - if (result.isEmpty()) { - // Nothing to delete - kvIterator.remove(); - continue; - } - if (result.size() > 1) { - throw new RuntimeException("Unexpected size: " + result.size()); - } - KeyValue getkv = result.get(0); - Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), - getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); - } else { - kv.updateLatestStamp(byteNow); - } - - // We must do this in this loop because it could affect - // the above get to find the next timestamp to remove. - // This is the case when there are multiple deletes for the same column. - size = this.memstoreSize.addAndGet(store.delete(kv)); - } - if (writeToWAL) { - this.log.append(regionInfo, - regionInfo.getTableDesc().getName(), kvs, now); + // + // write/sync to WAL should happen before we touch memstore. + // + // If order is reversed, i.e. we write to memstore first, and + // for some reason fail to write/sync to commit log, the memstore + // will contain uncommitted transactions. + // + + // bunch up all edits across all column families into a + // single WALEdit. + WALEdit walEdit = new WALEdit(); + for (Map.Entry> e : familyMap.entrySet()) { + List kvs = e.getValue(); + for (KeyValue kv : kvs) { + walEdit.add(kv); + } + } + // append the edit to WAL. The append also does the sync. + if (!walEdit.isEmpty()) { + this.log.append(regionInfo, regionInfo.getTableDesc().getName(), + walEdit, now); + } + } + + long size = 0; + + // + // Now make changes to the memstore. + // + for (Map.Entry> e : familyMap.entrySet()) { + + byte[] family = e.getKey(); + List kvs = e.getValue(); + + Store store = getStore(family); + for (KeyValue kv: kvs) { + // Check if time is LATEST, change to time of most recent addition if so + // This is expensive. + if (kv.isLatestTimestamp() && kv.isDeleteType()) { + List result = new ArrayList(1); + Get g = new Get(kv.getRow()); + NavigableSet qualifiers = + new TreeSet(Bytes.BYTES_COMPARATOR); + byte [] q = kv.getQualifier(); + if(q == null) q = HConstants.EMPTY_BYTE_ARRAY; + qualifiers.add(q); + get(store, g, qualifiers, result); + if (result.isEmpty()) { + // Nothing to delete + continue; + } + if (result.size() > 1) { + throw new RuntimeException("Unexpected size: " + result.size()); + } + KeyValue getkv = result.get(0); + Bytes.putBytes(kv.getBuffer(), kv.getTimestampOffset(), + getkv.getBuffer(), getkv.getTimestampOffset(), Bytes.SIZEOF_LONG); + } else { + kv.updateLatestStamp(byteNow); + } + size = this.memstoreSize.addAndGet(store.delete(kv)); + } } flush = isFlushSize(size); } finally { @@ -1270,15 +1294,8 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ Integer lid = getLock(lockid, row); byte [] now = Bytes.toBytes(System.currentTimeMillis()); try { - for (Map.Entry> entry: - put.getFamilyMap().entrySet()) { - byte [] family = entry.getKey(); - checkFamily(family); - List puts = entry.getValue(); - if (updateKeys(puts, now)) { - put(family, puts, writeToWAL); - } - } + // All edits for the given row (across all column families) must happen atomically. + put(put.getFamilyMap(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1337,16 +1354,9 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ matches = Bytes.equals(expectedValue, actualValue); } //If matches put the new put - if(matches) { - for(Map.Entry> entry : - put.getFamilyMap().entrySet()) { - byte [] fam = entry.getKey(); - checkFamily(fam); - List puts = entry.getValue(); - if(updateKeys(puts, now)) { - put(fam, puts, writeToWAL); - } - } + if (matches) { + // All edits for the given row (across all column families) must happen atomically. + put(put.getFamilyMap(), writeToWAL); return true; } return false; @@ -1456,34 +1466,74 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ */ private void put(final byte [] family, final List edits) throws IOException { - this.put(family, edits, true); + Map> familyMap = new HashMap>(); + familyMap.put(family, edits); + this.put(familyMap, true); } /** * Add updates first to the hlog (if writeToWal) and then add values to memstore. * Warning: Assumption is caller has lock on passed in row. - * @param family - * @param edits + * @param familyMap map of family to edits for the given family. * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(final byte [] family, final List edits, - boolean writeToWAL) throws IOException { - if (edits == null || edits.isEmpty()) { - return; - } + private void put(final Map> familyMap, + boolean writeToWAL) + throws IOException { + long now = System.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); boolean flush = false; this.updatesLock.readLock().lock(); try { - if (writeToWAL) { - long now = System.currentTimeMillis(); - this.log.append(regionInfo, - regionInfo.getTableDesc().getName(), edits, now); + WALEdit walEdit = new WALEdit(); + + // check if column families are valid; + // check if any timestampupdates are needed; + // and if writeToWAL is set, then also collapse edits into a single list. + for (Map.Entry> e: familyMap.entrySet()) { + List edits = e.getValue(); + byte[] family = e.getKey(); + + // is this a valid column family? + checkFamily(family); + + // update timestamp on keys if required. + if (updateKeys(edits, byteNow)) { + if (writeToWAL) { + // bunch up all edits across all column families into a + // single WALEdit. + for (KeyValue kv : edits) { + walEdit.add(kv); + } + } + } } + + // append to and sync WAL + if (!walEdit.isEmpty()) { + // + // write/sync to WAL should happen before we touch memstore. + // + // If order is reversed, i.e. we write to memstore first, and + // for some reason fail to write/sync to commit log, the memstore + // will contain uncommitted transactions. + // + + this.log.append(regionInfo, regionInfo.getTableDesc().getName(), + walEdit, now); + } + long size = 0; - Store store = getStore(family); - for (KeyValue kv: edits) { - size = this.memstoreSize.addAndGet(store.add(kv)); + // now make changes to the memstore + for (Map.Entry> e : familyMap.entrySet()) { + byte[] family = e.getKey(); + List edits = e.getValue(); + + Store store = getStore(family); + for (KeyValue kv: edits) { + size = this.memstoreSize.addAndGet(store.add(kv)); + } } flush = isFlushSize(size); } finally { @@ -2402,10 +2452,10 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ // now log it: if (writeToWAL) { long now = System.currentTimeMillis(); - List edits = new ArrayList(1); - edits.add(newKv); - this.log.append(regionInfo, - regionInfo.getTableDesc().getName(), edits, now); + WALEdit walEdit = new WALEdit(); + walEdit.add(newKv); + this.log.append(regionInfo, regionInfo.getTableDesc().getName(), + walEdit, now); } // Now request the ICV to the store, this will set the timestamp @@ -2460,7 +2510,6 @@ public class HRegion implements HConstants, HeapSize { // , Writable{ (5 * Bytes.SIZEOF_BOOLEAN)) + (3 * ClassSize.REENTRANT_LOCK)); - @Override public long heapSize() { long heapSize = DEEP_OVERHEAD; for(Store store : this.stores.values()) { diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 3c8957a3d74..84ac3ad2572 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1686,10 +1686,6 @@ public class HRegionServer implements HConstants, HRegionInterface, } boolean writeToWAL = put.getWriteToWAL(); region.put(put, getLockFromId(put.getLockId()), writeToWAL); - - if (writeToWAL) { - this.syncWal(region); - } } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1723,11 +1719,6 @@ public class HRegionServer implements HConstants, HRegionInterface, } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } - // All have been processed successfully. - - if (writeToWAL) { - this.syncWal(region); - } return -1; } @@ -1758,7 +1749,6 @@ public class HRegionServer implements HConstants, HRegionInterface, } boolean retval = region.checkAndPut(row, family, qualifier, value, put, getLockFromId(put.getLockId()), true); - this.syncWal(region); return retval; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -1912,7 +1902,6 @@ public class HRegionServer implements HConstants, HRegionInterface, } Integer lid = getLockFromId(delete.getLockId()); region.delete(delete, lid, writeToWAL); - this.syncWal(region); } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1944,8 +1933,6 @@ public class HRegionServer implements HConstants, HRegionInterface, } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } - - this.syncWal(region); return -1; } @@ -2382,14 +2369,8 @@ public class HRegionServer implements HConstants, HRegionInterface, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); - long retval = region.incrementColumnValue(row, family, qualifier, amount, - writeToWAL); - - if (writeToWAL) { - syncWal(region); - } - - return retval; + return region.incrementColumnValue(row, family, qualifier, amount, + writeToWAL); } catch (IOException e) { checkFileSystem(); throw e; @@ -2411,13 +2392,6 @@ public class HRegionServer implements HConstants, HRegionInterface, return serverInfo; } - // Sync the WAL if the table permits it - private void syncWal(HRegion region) { - if(!region.getTableDesc().isDeferredLogFlush()) { - this.hlog.sync(region.getRegionInfo().isMetaRegion()); - } - } - /** * Interval at which threads should run * @return the interval diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 61f2c897475..68435b26ef4 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile.Reader; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; @@ -294,7 +295,7 @@ public class Store implements HConstants, HeapSize { * * We can ignore any log message that has a sequence ID that's equal to or * lower than maxSeqID. (Because we know such log messages are already - * reflected in the MapFiles.) + * reflected in the HFiles.) * * @return the new max sequence id as per the log, or -1 if no log recovered */ @@ -324,9 +325,17 @@ public class Store implements HConstants, HeapSize { int reportInterval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); HLog.Entry entry; + // TBD: Need to add an exception handler around logReader.next. + // + // A transaction now appears as a single edit. If logReader.next() + // returns an exception, then it must be a incomplete/partial + // transaction at the end of the file. Rather than bubble up + // the exception, we should catch it and simply ignore the + // partial transaction during this recovery phase. + // while ((entry = logReader.next()) != null) { HLogKey key = entry.getKey(); - KeyValue val = entry.getEdit(); + WALEdit val = entry.getEdit(); if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } @@ -335,27 +344,28 @@ public class Store implements HConstants, HeapSize { skippedEdits++; continue; } - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (val.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || - !val.matchingFamily(family.getName())) { - continue; - } - if (val.isDelete()) { - this.memstore.delete(val); - } else { - this.memstore.add(val); - } - editsCount++; + for (KeyValue kv : val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || + !kv.matchingFamily(family.getName())) { + continue; + } + if (kv.isDelete()) { + this.memstore.delete(kv); + } else { + this.memstore.add(kv); + } + editsCount++; + } + // Every 2k edits, tell the reporter we're making progress. // Have seen 60k edits taking 3minutes to complete. if (reporter != null && (editsCount % reportInterval) == 0) { reporter.progress(); } - // Instantiate a new KeyValue to perform Writable on - val = new KeyValue(); } if (LOG.isDebugEnabled()) { LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + @@ -856,7 +866,6 @@ public class Store implements HConstants, HeapSize { /** * Do a minor/major compaction. Uses the scan infrastructure to make it easy. * - * @param writer output writer * @param filesToCompact which files to compact * @param majorCompaction true to major compact (prune all deletes, max versions, etc) * @param maxId Readers maximum sequence id. diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 1af8bab7ffe..09c9c0a400e 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.Writable; import java.io.DataInput; @@ -425,7 +426,8 @@ public class HLog implements HConstants, Syncable { */ @SuppressWarnings("unchecked") public static Writer createWriter(final FileSystem fs, - final Path path, Configuration conf) throws IOException { + final Path path, Configuration conf) + throws IOException { try { Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl", SequenceFileLogWriter.class.getCanonicalName())); @@ -628,12 +630,13 @@ public class HLog implements HConstants, Syncable { * @param now Time of this edit write. * @throws IOException */ - public void append(HRegionInfo regionInfo, KeyValue logEdit, - final long now) + public void append(HRegionInfo regionInfo, WALEdit logEdit, + final long now, + final boolean isMetaRegion) throws IOException { byte [] regionName = regionInfo.getRegionName(); byte [] tableName = regionInfo.getTableDesc().getName(); - this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit); + this.append(regionInfo, makeKey(regionName, tableName, -1, now), logEdit, isMetaRegion); } /** @@ -655,7 +658,8 @@ public class HLog implements HConstants, Syncable { * @param logKey * @throws IOException */ - public void append(HRegionInfo regionInfo, HLogKey logKey, KeyValue logEdit) + public void append(HRegionInfo regionInfo, HLogKey logKey, WALEdit logEdit, + final boolean isMetaRegion) throws IOException { if (this.closed) { throw new IOException("Cannot append; log is closed"); @@ -674,6 +678,10 @@ public class HLog implements HConstants, Syncable { this.unflushedEntries.incrementAndGet(); this.numEntries.incrementAndGet(); } + + // sync txn to file system + this.sync(isMetaRegion); + if (this.editsSize.get() > this.logrollsize) { if (listener != null) { listener.logRollRequested(); @@ -704,31 +712,31 @@ public class HLog implements HConstants, Syncable { * @param now * @throws IOException */ - public void append(HRegionInfo info, byte [] tableName, List edits, + public void append(HRegionInfo info, byte [] tableName, WALEdit edits, final long now) throws IOException { byte[] regionName = info.getRegionName(); if (this.closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum [] = obtainSeqNum(edits.size()); + long seqNum = obtainSeqNum(); synchronized (this.updateLock) { // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush // is greater than or equal to the value in lastSeqWritten. - this.lastSeqWritten.putIfAbsent(regionName, Long.valueOf(seqNum[0])); + this.lastSeqWritten.putIfAbsent(regionName, seqNum); int counter = 0; - for (KeyValue kv: edits) { - HLogKey logKey = makeKey(regionName, tableName, seqNum[counter++], now); - doWrite(info, logKey, kv); - this.numEntries.incrementAndGet(); - } + HLogKey logKey = makeKey(regionName, tableName, seqNum, now); + doWrite(info, logKey, edits); + this.numEntries.incrementAndGet(); // Only count 1 row as an unflushed entry. this.unflushedEntries.incrementAndGet(); } + // sync txn to file system + this.sync(info.isMetaRegion()); if (this.editsSize.get() > this.logrollsize) { requestLogRoll(); } @@ -869,7 +877,7 @@ public class HLog implements HConstants, Syncable { } } - protected void doWrite(HRegionInfo info, HLogKey logKey, KeyValue logEdit) + protected void doWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { if (!this.enabled) { return; @@ -931,8 +939,9 @@ public class HLog implements HConstants, Syncable { * completion of a cache-flush. Otherwise the log-seq-id for the flush will * not appear in the correct logfile. * - * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long)} - * @see #completeCacheFlush(byte[], byte[], long) + * @return sequence ID to pass {@link #completeCacheFlush(byte[], byte[], long, boolean)} + * (byte[], byte[], long)} + * @see #completeCacheFlush(byte[], byte[], long, boolean) * @see #abortCacheFlush() */ public long startCacheFlush() { @@ -951,7 +960,8 @@ public class HLog implements HConstants, Syncable { * @throws IOException */ public void completeCacheFlush(final byte [] regionName, final byte [] tableName, - final long logSeqId) + final long logSeqId, + final boolean isMetaRegion) throws IOException { try { if (this.closed) { @@ -959,9 +969,10 @@ public class HLog implements HConstants, Syncable { } synchronized (updateLock) { long now = System.currentTimeMillis(); + WALEdit edits = completeCacheFlushLogEdit(); this.writer.append(new HLog.Entry( makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()), - completeCacheFlushLogEdit())); + edits)); writeTime += System.currentTimeMillis() - now; writeOps++; this.numEntries.incrementAndGet(); @@ -970,14 +981,20 @@ public class HLog implements HConstants, Syncable { this.lastSeqWritten.remove(regionName); } } + // sync txn to file system + this.sync(isMetaRegion); + } finally { this.cacheFlushLock.unlock(); } } - private KeyValue completeCacheFlushLogEdit() { - return new KeyValue(METAROW, METAFAMILY, null, + private WALEdit completeCacheFlushLogEdit() { + KeyValue kv = new KeyValue(METAROW, METAFAMILY, null, System.currentTimeMillis(), COMPLETE_CACHE_FLUSH); + WALEdit e = new WALEdit(); + e.add(kv); + return e; } /** @@ -1278,11 +1295,11 @@ public class HLog implements HConstants, Syncable { * Only used when splitting logs */ public static class Entry implements Writable { - private KeyValue edit; + private WALEdit edit; private HLogKey key; public Entry() { - edit = new KeyValue(); + edit = new WALEdit(); key = new HLogKey(); } @@ -1291,7 +1308,7 @@ public class HLog implements HConstants, Syncable { * @param edit log's edit * @param key log's key */ - public Entry(HLogKey key, KeyValue edit) { + public Entry(HLogKey key, WALEdit edit) { super(); this.key = key; this.edit = edit; @@ -1300,7 +1317,7 @@ public class HLog implements HConstants, Syncable { * Gets the edit * @return edit */ - public KeyValue getEdit() { + public WALEdit getEdit() { return edit; } /** diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index b27f43233af..7239d01b009 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -6,8 +6,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.io.SequenceFile; public class SequenceFileLogReader implements HLog.Reader { @@ -97,7 +97,7 @@ public class SequenceFileLogReader implements HLog.Reader { public HLog.Entry next(HLog.Entry reuse) throws IOException { if (reuse == null) { HLogKey key = HLog.newKey(conf); - KeyValue val = new KeyValue(); + WALEdit val = new WALEdit(); if (reader.next(key, val)) { return new HLog.Entry(key, val); } diff --git a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 52fb31efcfb..7808d95c035 100644 --- a/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -24,7 +24,7 @@ public class SequenceFileLogWriter implements HLog.Writer { public void init(FileSystem fs, Path path, Configuration conf) throws IOException { writer = SequenceFile.createWriter(fs, conf, path, - HLog.getKeyClass(conf), KeyValue.class, + HLog.getKeyClass(conf), WALEdit.class, fs.getConf().getInt("io.file.buffer.size", 4096), (short) conf.getInt("hbase.regionserver.hlog.replication", fs.getDefaultReplication()), diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 6908b3293bf..ac5f7537dbb 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -21,7 +21,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TreeMap; import org.apache.commons.logging.Log; @@ -374,7 +376,9 @@ public class TestHRegion extends HBaseTestCase { //testing existing family byte [] family = fam2; try { - region.delete(family, kvs, true); + Map> deleteMap = new HashMap>(); + deleteMap.put(family, kvs); + region.delete(deleteMap, true); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -383,7 +387,9 @@ public class TestHRegion extends HBaseTestCase { boolean ok = false; family = fam4; try { - region.delete(family, kvs, true); + Map> deleteMap = new HashMap>(); + deleteMap.put(family, kvs); + region.delete(deleteMap, true); } catch (Exception e) { ok = true; } @@ -605,7 +611,9 @@ public class TestHRegion extends HBaseTestCase { kvs.add(new KeyValue(row1, fam1, col2, null)); kvs.add(new KeyValue(row1, fam1, col3, null)); - region.delete(fam1, kvs, true); + Map> deleteMap = new HashMap>(); + deleteMap.put(fam1, kvs); + region.delete(deleteMap, true); // extract the key values out the memstore: // This is kinda hacky, but better than nothing... diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java index 556d1e65c11..5a449b85798 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreReconstruction.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.*; @@ -111,24 +112,24 @@ public class TestStoreReconstruction { final byte[] regionName = info.getRegionName(); // Add 10 000 edits to HLog on the good family - List edit = new ArrayList(); for (int j = 0; j < TOTAL_EDITS; j++) { byte[] qualifier = Bytes.toBytes(Integer.toString(j)); byte[] column = Bytes.toBytes("column:" + Integer.toString(j)); + WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifier, System.currentTimeMillis(), column)); log.append(info, tableName, edit, System.currentTimeMillis()); - edit.clear(); } // Add a cache flush, shouldn't have any effect long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId); + log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion()); // Add an edit to another family, should be skipped. + WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, System.currentTimeMillis(), rowName)); - log.append(info, tableName, edit, + log.append(info, tableName, edit, System.currentTimeMillis()); log.sync(); diff --git a/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index bc20ed751a7..49f519ffe4b 100644 --- a/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -91,7 +91,7 @@ public class TestHLog extends HBaseTestCase implements HConstants { for (int i = 0; i < howmany; i++) { for (int j = 0; j < howmany; j++) { - List edit = new ArrayList(); + WALEdit edit = new WALEdit(); byte [] family = Bytes.toBytes("column"); byte [] qualifier = Bytes.toBytes(Integer.toString(j)); byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); @@ -142,7 +142,7 @@ public class TestHLog extends HBaseTestCase implements HConstants { null,null, false); for (int i = 0; i < total; i++) { - List kvs = new ArrayList(); + WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); wal.append(info, bytes, kvs, System.currentTimeMillis()); } @@ -160,7 +160,7 @@ public class TestHLog extends HBaseTestCase implements HConstants { // Add test that checks to see that an open of a Reader works on a file // that has had a sync done on it. for (int i = 0; i < total; i++) { - List kvs = new ArrayList(); + WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); wal.append(info, bytes, kvs, System.currentTimeMillis()); } @@ -179,7 +179,7 @@ public class TestHLog extends HBaseTestCase implements HConstants { // especially that we return good length on file. final byte [] value = new byte[1025 * 1024]; // Make a 1M value. for (int i = 0; i < total; i++) { - List kvs = new ArrayList(); + WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value)); wal.append(info, bytes, kvs, System.currentTimeMillis()); } @@ -236,7 +236,7 @@ public class TestHLog extends HBaseTestCase implements HConstants { HLog.Entry entry = new HLog.Entry(); while((entry = reader.next(entry)) != null) { HLogKey key = entry.getKey(); - KeyValue kv = entry.getEdit(); + WALEdit kv = entry.getEdit(); String region = Bytes.toString(key.getRegionName()); // Assert that all edits are for same region. if (previousRegion != null) { @@ -245,7 +245,6 @@ public class TestHLog extends HBaseTestCase implements HConstants { assertTrue(seqno < key.getLogSeqNum()); seqno = key.getLogSeqNum(); previousRegion = region; - System.out.println(key + " " + kv); count++; } assertEquals(howmany * howmany, count); @@ -269,44 +268,49 @@ public class TestHLog extends HBaseTestCase implements HConstants { // Write columns named 1, 2, 3, etc. and then values of single byte // 1, 2, 3... long timestamp = System.currentTimeMillis(); - List cols = new ArrayList(); + WALEdit cols = new WALEdit(); for (int i = 0; i < COL_COUNT; i++) { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } HRegionInfo info = new HRegionInfo(new HTableDescriptor(tableName), - row,Bytes.toBytes(Bytes.toString(row) + "1"), false); + row,Bytes.toBytes(Bytes.toString(row) + "1"), false); final byte [] regionName = info.getRegionName(); log.append(info, tableName, cols, System.currentTimeMillis()); long logSeqId = log.startCacheFlush(); - log.completeCacheFlush(regionName, tableName, logSeqId); + log.completeCacheFlush(regionName, tableName, logSeqId, info.isMetaRegion()); log.close(); Path filename = log.computeFilename(log.getFilenum()); log = null; // Now open a reader on the log and assert append worked. reader = HLog.getReader(fs, filename, conf); - HLog.Entry entry = new HLog.Entry(); - for (int i = 0; i < COL_COUNT; i++) { - reader.next(entry); + // Above we added all columns on a single row so we only read one + // entry in the below... thats why we have '1'. + for (int i = 0; i < 1; i++) { + HLog.Entry entry = reader.next(null); + if (entry == null) break; HLogKey key = entry.getKey(); - KeyValue val = entry.getEdit(); + WALEdit val = entry.getEdit(); assertTrue(Bytes.equals(regionName, key.getRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); - assertTrue(Bytes.equals(row, val.getRow())); - assertEquals((byte)(i + '0'), val.getValue()[0]); + KeyValue kv = val.getKeyValues().get(0); + assertTrue(Bytes.equals(row, kv.getRow())); + assertEquals((byte)(i + '0'), kv.getValue()[0]); System.out.println(key + " " + val); } + HLog.Entry entry = null; while ((entry = reader.next(null)) != null) { HLogKey key = entry.getKey(); - KeyValue val = entry.getEdit(); + WALEdit val = entry.getEdit(); // Assert only one more row... the meta flushed row. assertTrue(Bytes.equals(regionName, key.getRegionName())); assertTrue(Bytes.equals(tableName, key.getTablename())); - assertTrue(Bytes.equals(HLog.METAROW, val.getRow())); - assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily())); + KeyValue kv = val.getKeyValues().get(0); + assertTrue(Bytes.equals(HLog.METAROW, kv.getRow())); + assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily())); assertEquals(0, Bytes.compareTo(HLog.COMPLETE_CACHE_FLUSH, - val.getValue())); + val.getKeyValues().get(0).getValue())); System.out.println(key + " " + val); } } finally {