From b3ec7ced77ca583863bd8ff5945a6b344e0b015d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 31 May 2012 03:46:27 +0000 Subject: [PATCH] HBASE-6059 Replaying recovered edits would make deleted data exist again git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1344554 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/Compactor.java | 5 +- .../hadoop/hbase/regionserver/HRegion.java | 73 ++++++------ .../hadoop/hbase/regionserver/Store.java | 4 + .../hbase/regionserver/TestHRegion.java | 25 ++++- .../hadoop/hbase/regionserver/TestStore.java | 17 ++- .../hbase/regionserver/wal/TestWALReplay.java | 105 +++++++++++++++++- 6 files changed, 179 insertions(+), 50 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java index 12cc99f523d..9ed051f8be3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Compactor.java @@ -151,7 +151,10 @@ class Compactor extends Configured { boolean hasMore; do { hasMore = scanner.next(kvs, compactionKVMax); - if (writer == null && !kvs.isEmpty()) { + // Create the writer even if no kv(Empty store file is also ok), + // because we need record the max seq id for the store file, see + // HBASE-6059 + if (writer == null) { writer = store.createWriterInTmp(maxKeyCount, compactionCompression, true); } if (writer != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 86f8e77bdd8..7eaa62b8a7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -526,22 +526,13 @@ public class HRegion implements HeapSize { // , Writable{ cleanupTmpDir(); // Load in all the HStores. - // Get minimum of the maxSeqId across all the store. // // Context: During replay we want to ensure that we do not lose any data. So, we // have to be conservative in how we replay logs. For each store, we calculate - // the maxSeqId up to which the store was flushed. But, since different stores - // could have a different maxSeqId, we choose the - // minimum across all the stores. - // This could potentially result in duplication of data for stores that are ahead - // of others. ColumnTrackers in the ScanQueryMatchers do the de-duplication, so we - // do not have to worry. - // TODO: If there is a store that was never flushed in a long time, we could replay - // a lot of data. Currently, this is not a problem because we flush all the stores at - // the same time. If we move to per-cf flushing, we might want to revisit this and send - // in a vector of maxSeqIds instead of sending in a single number, which has to be the - // min across all the max. - long minSeqId = -1; + // the maxSeqId up to which the store was flushed. And, skip the edits which + // is equal to or lower than maxSeqId for each store. + Map maxSeqIdInStores = new TreeMap( + Bytes.BYTES_COMPARATOR); long maxSeqId = -1; // initialized to -1 so that we pick up MemstoreTS from column families long maxMemstoreTS = -1; @@ -571,9 +562,8 @@ public class HRegion implements HeapSize { // , Writable{ this.stores.put(store.getColumnFamilyName().getBytes(), store); long storeSeqId = store.getMaxSequenceId(); - if (minSeqId == -1 || storeSeqId < minSeqId) { - minSeqId = storeSeqId; - } + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), + storeSeqId); if (maxSeqId == -1 || storeSeqId > maxSeqId) { maxSeqId = storeSeqId; } @@ -593,7 +583,7 @@ public class HRegion implements HeapSize { // , Writable{ mvcc.initialize(maxMemstoreTS + 1); // Recover any edits if available. maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( - this.regiondir, minSeqId, reporter, status)); + this.regiondir, maxSeqIdInStores, reporter, status)); status.setStatus("Cleaning up detritus from prior splits"); // Get rid of any splits or merges that were lost in-progress. Clean out @@ -2755,8 +2745,8 @@ public class HRegion implements HeapSize { // , Writable{ * make sense in a this single region context only -- until we online. * * @param regiondir - * @param minSeqId Any edit found in split editlogs needs to be in excess of - * this minSeqId to be applied, else its skipped. + * @param maxSeqIdInStores Any edit found in split editlogs needs to be in excess of + * the maxSeqId for the store to be applied, else its skipped. * @param reporter * @return the sequence id of the last edit added to this region out of the * recovered edits log or minSeqId if nothing added from editlogs. @@ -2764,12 +2754,19 @@ public class HRegion implements HeapSize { // , Writable{ * @throws IOException */ protected long replayRecoveredEditsIfAny(final Path regiondir, - final long minSeqId, final CancelableProgressable reporter, - final MonitoredTask status) + Map maxSeqIdInStores, + final CancelableProgressable reporter, final MonitoredTask status) throws UnsupportedEncodingException, IOException { - long seqid = minSeqId; + long minSeqIdForTheRegion = -1; + for (Long maxSeqIdInStore : maxSeqIdInStores.values()) { + if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) { + minSeqIdForTheRegion = maxSeqIdInStore; + } + } + long seqid = minSeqIdForTheRegion; NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); if (files == null || files.isEmpty()) return seqid; + for (Path edits: files) { if (edits == null || !this.fs.exists(edits)) { LOG.warn("Null or non-existent edits file: " + edits); @@ -2780,16 +2777,16 @@ public class HRegion implements HeapSize { // , Writable{ long maxSeqId = Long.MAX_VALUE; String fileName = edits.getName(); maxSeqId = Math.abs(Long.parseLong(fileName)); - if (maxSeqId <= minSeqId) { + if (maxSeqId <= minSeqIdForTheRegion) { String msg = "Maximum sequenceid for this log is " + maxSeqId - + " and minimum sequenceid for the region is " + minSeqId + + " and minimum sequenceid for the region is " + minSeqIdForTheRegion + ", skipped the whole file, path=" + edits; LOG.debug(msg); continue; } try { - seqid = replayRecoveredEdits(edits, seqid, reporter); + seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter); } catch (IOException e) { boolean skipErrors = conf.getBoolean("hbase.skip.errors", false); if (skipErrors) { @@ -2806,7 +2803,7 @@ public class HRegion implements HeapSize { // , Writable{ this.rsAccounting.clearRegionReplayEditsSize(this.regionInfo.getRegionName()); } } - if (seqid > minSeqId) { + if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. internalFlushcache(null, seqid, status); } @@ -2823,18 +2820,17 @@ public class HRegion implements HeapSize { // , Writable{ /* * @param edits File of recovered edits. - * @param minSeqId Minimum sequenceid found in a store file. Edits in log - * must be larger than this to be replayed. + * @param maxSeqIdInStores Maximum sequenceid found in each store. Edits in log + * must be larger than this to be replayed for each store. * @param reporter * @return the sequence id of the last edit added to this region out of the * recovered edits log or minSeqId if nothing added from editlogs. * @throws IOException */ private long replayRecoveredEdits(final Path edits, - final long minSeqId, final CancelableProgressable reporter) + Map maxSeqIdInStores, final CancelableProgressable reporter) throws IOException { - String msg = "Replaying edits from " + edits + "; minSequenceid=" + - minSeqId + "; path=" + edits; + String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); @@ -2842,7 +2838,7 @@ public class HRegion implements HeapSize { // , Writable{ HLog.Reader reader = null; try { reader = HLog.getReader(this.fs, edits, conf); - long currentEditSeqId = minSeqId; + long currentEditSeqId = -1; long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; @@ -2901,12 +2897,6 @@ public class HRegion implements HeapSize { // , Writable{ if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } - // Now, figure if we should skip this edit. - if (key.getLogSeqNum() <= currentEditSeqId) { - skippedEdits++; - continue; - } - currentEditSeqId = key.getLogSeqNum(); boolean flush = false; for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special @@ -2927,6 +2917,13 @@ public class HRegion implements HeapSize { // , Writable{ skippedEdits++; continue; } + // Now, figure if we should skip this edit. + if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily() + .getName())) { + skippedEdits++; + continue; + } + currentEditSeqId = key.getLogSeqNum(); // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 7157c04037b..632164e3216 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -1728,6 +1728,10 @@ public class Store extends SchemaConfigured implements HeapSize { LOG.warn("StoreFile " + f + " has a null Reader"); return; } + if (r.getEntries() == 0) { + LOG.warn("StoreFile " + f + " is a empty store file"); + return; + } // TODO: Cache these keys rather than make each time? byte [] fk = r.getFirstKey(); KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index f925ccb8e92..2e6f02263bd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -180,7 +180,13 @@ public class TestHRegion extends HBaseTestCase { writer.close(); } MonitoredTask status = TaskMonitor.get().createStatus(method); - long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status); + Map maxSeqIdInStores = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (Store store : region.getStores().values()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), + minSeqId - 1); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); Get get = new Get(row); Result result = region.get(get, null); @@ -226,7 +232,13 @@ public class TestHRegion extends HBaseTestCase { } long recoverSeqId = 1030; MonitoredTask status = TaskMonitor.get().createStatus(method); - long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status); + Map maxSeqIdInStores = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (Store store : region.getStores().values()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), + recoverSeqId - 1); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); assertEquals(maxSeqId, seqId); Get get = new Get(row); Result result = region.get(get, null); @@ -267,7 +279,14 @@ public class TestHRegion extends HBaseTestCase { recoveredEditsDir, String.format("%019d", minSeqId-1)); FSDataOutputStream dos= fs.create(recoveredEdits); dos.close(); - long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null); + + Map maxSeqIdInStores = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (Store store : region.getStores().values()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, + maxSeqIdInStores, null, null); assertEquals(minSeqId, seqId); } finally { HRegion.closeHRegion(this.region); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 121d2778930..c0718bcc136 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -183,12 +183,17 @@ public class TestStore extends TestCase { for (int i = 1; i <= storeFileNum; i++) { // verify the expired store file. CompactionRequest cr = this.store.requestCompaction(); - assertEquals(1, cr.getFiles().size()); - assertTrue(cr.getFiles().get(0).getReader().getMaxTimestamp() < - (System.currentTimeMillis() - this.store.scanInfo.getTtl())); - // Verify that the expired the store has been deleted. - this.store.compact(cr); - assertEquals(storeFileNum - i, this.store.getStorefiles().size()); + // the first is expired normally. + // If not the first compaction, there is another empty store file, + assertEquals(Math.min(i, 2), cr.getFiles().size()); + for (int j = 0; i < cr.getFiles().size(); j++) { + assertTrue(cr.getFiles().get(j).getReader().getMaxTimestamp() < (System + .currentTimeMillis() - this.store.scanInfo.getTtl())); + } + // Verify that the expired store file is compacted to an empty store file. + StoreFile compactedFile = this.store.compact(cr); + // It is an empty store file. + assertEquals(0, compactedFile.getReader().getEntries()); // Let the next store file expired. Thread.sleep(sleepTime); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 4002842b04b..3f515d0180c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -35,13 +35,20 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -77,7 +84,7 @@ public class TestWALReplay { conf.setBoolean("dfs.support.append", true); // The below config supported by 0.20-append and CDH3b2 conf.setInt("dfs.client.block.recovery.retries", 2); - TEST_UTIL.startMiniDFSCluster(3); + TEST_UTIL.startMiniCluster(3); Path hbaseRootDir = TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); LOG.info("hbase.rootdir=" + hbaseRootDir); @@ -86,7 +93,7 @@ public class TestWALReplay { @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniDFSCluster(); + TEST_UTIL.shutdownMiniCluster(); } @Before @@ -117,6 +124,100 @@ public class TestWALReplay { } } + /** + * + * @throws Exception + */ + @Test + public void testReplayEditsAfterRegionMovedWithMultiCF() throws Exception { + final byte[] tableName = Bytes + .toBytes("testReplayEditsAfterRegionMovedWithMultiCF"); + byte[] family1 = Bytes.toBytes("cf1"); + byte[] family2 = Bytes.toBytes("cf2"); + byte[] qualifier = Bytes.toBytes("q"); + byte[] value = Bytes.toBytes("testV"); + byte[][] familys = { family1, family2 }; + TEST_UTIL.createTable(tableName, familys); + HTable htable = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put put = new Put(Bytes.toBytes("r1")); + put.add(family1, qualifier, value); + htable.put(put); + ResultScanner resultScanner = htable.getScanner(new Scan()); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + assertEquals(1, count); + + MiniHBaseCluster hbaseCluster = TEST_UTIL.getMiniHBaseCluster(); + List regions = hbaseCluster.getRegions(tableName); + assertEquals(1, regions.size()); + + // move region to another regionserver + HRegion destRegion = regions.get(0); + int originServerNum = hbaseCluster + .getServerWith(destRegion.getRegionName()); + assertTrue("Please start more than 1 regionserver", hbaseCluster + .getRegionServerThreads().size() > 1); + int destServerNum = 0; + while (destServerNum == originServerNum) { + destServerNum++; + } + HRegionServer originServer = hbaseCluster.getRegionServer(originServerNum); + HRegionServer destServer = hbaseCluster.getRegionServer(destServerNum); + // move region to destination regionserver + moveRegionAndWait(destRegion, destServer); + + // delete the row + Delete del = new Delete(Bytes.toBytes("r1")); + htable.delete(del); + resultScanner = htable.getScanner(new Scan()); + count = 0; + while (resultScanner.next() != null) { + count++; + } + resultScanner.close(); + assertEquals(0, count); + + // flush region and make major compaction + destServer.getOnlineRegion(destRegion.getRegionName()).flushcache(); + // wait to complete major compaction + for (Store store : destServer.getOnlineRegion(destRegion.getRegionName()) + .getStores().values()) { + store.triggerMajorCompaction(); + } + destServer.getOnlineRegion(destRegion.getRegionName()).compactStores(); + + // move region to origin regionserver + moveRegionAndWait(destRegion, originServer); + // abort the origin regionserver + originServer.abort("testing"); + + // see what we get + Result result = htable.get(new Get(Bytes.toBytes("r1"))); + if (result != null) { + assertTrue("Row is deleted, but we get" + result.toString(), + (result == null) || result.isEmpty()); + } + resultScanner.close(); + } + + private void moveRegionAndWait(HRegion destRegion, HRegionServer destServer) + throws InterruptedException, MasterNotRunningException, + ZooKeeperConnectionException, IOException { + HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); + TEST_UTIL.getHBaseAdmin().move( + destRegion.getRegionInfo().getEncodedNameAsBytes(), + Bytes.toBytes(destServer.getServerName().getServerName())); + while (true) { + ServerName serverName = master.getAssignmentManager() + .getRegionServerOfRegion(destRegion.getRegionInfo()); + if (serverName != null && serverName.equals(destServer.getServerName())) break; + Thread.sleep(10); + } + } + /** * Tests for hbase-2727. * @throws Exception