From 7a0c1daf6382e484c9c1b2fba6e735a8306f8133 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Sat, 7 Apr 2012 15:30:59 +0000 Subject: [PATCH] HBASE-5689 Skipping RecoveredEdits may cause data loss (Chunhui) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1310788 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/regionserver/HRegion.java | 26 ++---- .../hbase/regionserver/wal/HLogSplitter.java | 48 +++++++--- .../hbase/regionserver/TestHRegion.java | 92 +++++++++++++++++++ 3 files changed, 137 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f1a68e0c52a..c3df3193ed2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2718,7 +2718,6 @@ public class HRegion implements HeapSize { // , Writable{ long seqid = minSeqId; NavigableSet files = HLog.getSplitEditFilesSorted(this.fs, regiondir); if (files == null || files.isEmpty()) return seqid; - boolean checkSafeToSkip = true; for (Path edits: files) { if (edits == null || !this.fs.exists(edits)) { LOG.warn("Null or non-existent edits file: " + edits); @@ -2726,22 +2725,15 @@ public class HRegion implements HeapSize { // , Writable{ } if (isZeroLengthThenDelete(this.fs, edits)) continue; - if (checkSafeToSkip) { - Path higher = files.higher(edits); - long maxSeqId = Long.MAX_VALUE; - if (higher != null) { - // Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+" - String fileName = higher.getName(); - maxSeqId = Math.abs(Long.parseLong(fileName)); - } - if (maxSeqId <= minSeqId) { - String msg = "Maximum possible sequenceid for this log is " + maxSeqId - + ", skipped the whole file, path=" + edits; - LOG.debug(msg); - continue; - } else { - checkSafeToSkip = false; - } + long maxSeqId = Long.MAX_VALUE; + String fileName = edits.getName(); + maxSeqId = Math.abs(Long.parseLong(fileName)); + if (maxSeqId <= minSeqId) { + String msg = "Maximum sequenceid for this log is " + maxSeqId + + " and minimum sequenceid for the region is " + minSeqId + + ", skipped the whole file, path=" + edits; + LOG.debug(msg); + continue; } try { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index 5c8fc5e933b..f54d6fd8083 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -426,6 +426,7 @@ public class HLogSplitter { } } wap.w.append(entry); + outputSink.updateRegionMaximumEditLogSeqNum(entry); editsCount++; // If sufficient edits have passed OR we've opened a few files, check if // we should report progress. @@ -455,7 +456,8 @@ public class HLogSplitter { throw e; } finally { int n = 0; - for (Object o : logWriters.values()) { + for (Map.Entry logWritersEntry : logWriters.entrySet()) { + Object o = logWritersEntry.getValue(); long t1 = EnvironmentEdgeManager.currentTimeMillis(); if ((t1 - last_report_at) > period) { last_report_at = t; @@ -471,7 +473,8 @@ public class HLogSplitter { WriterAndPath wap = (WriterAndPath)o; wap.w.close(); LOG.debug("Closed " + wap.p); - Path dst = getCompletedRecoveredEditsFilePath(wap.p); + Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink + .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey())); if (!dst.equals(wap.p) && fs.exists(dst)) { LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + dst @@ -488,6 +491,7 @@ public class HLogSplitter { if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } + LOG.debug("Rename " + wap.p + " to " + dst); } } String msg = "Processed " + editsCount + " edits across " + n + " regions" + @@ -681,16 +685,16 @@ public class HLogSplitter { } /** - * Convert path to a file under RECOVERED_EDITS_DIR directory without - * RECOVERED_LOG_TMPFILE_SUFFIX + * Get the completed recovered edits file path, renaming it to be by last edit + * in the file from its first edit. Then we could use the name to skip + * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}. * @param srcPath - * @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX + * @param maximumEditLogSeqNum + * @return dstPath take file's last edit log seq num as the name */ - static Path getCompletedRecoveredEditsFilePath(Path srcPath) { - String fileName = srcPath.getName(); - if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) { - fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0]; - } + static Path getCompletedRecoveredEditsFilePath(Path srcPath, + Long maximumEditLogSeqNum) { + String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); return new Path(srcPath.getParent(), fileName); } @@ -1027,6 +1031,7 @@ public class HLogSplitter { } } + private void writeBuffer(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { @@ -1050,6 +1055,7 @@ public class HLogSplitter { } } wap.w.append(logEntry); + outputSink.updateRegionMaximumEditLogSeqNum(logEntry); editsCount++; } // Pass along summary statistics @@ -1138,6 +1144,8 @@ public class HLogSplitter { class OutputSink { private final Map logWriters = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); + private final Map regionMaximumEditLogSeqNum = Collections + .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); private final List writerThreads = Lists.newArrayList(); /* Set of regions which we've decided should not output edits */ @@ -1204,8 +1212,11 @@ public class HLogSplitter { List paths = new ArrayList(); List thrown = Lists.newArrayList(); closeLogWriters(thrown); - for (WriterAndPath wap : logWriters.values()) { - Path dst = getCompletedRecoveredEditsFilePath(wap.p); + for (Map.Entry logWritersEntry : logWriters + .entrySet()) { + WriterAndPath wap = logWritersEntry.getValue(); + Path dst = getCompletedRecoveredEditsFilePath(wap.p, + regionMaximumEditLogSeqNum.get(logWritersEntry.getKey())); try { if (!dst.equals(wap.p) && fs.exists(dst)) { LOG.warn("Found existing old edits file. It could be the " @@ -1223,6 +1234,7 @@ public class HLogSplitter { if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } + LOG.debug("Rename " + wap.p + " to " + dst); } } catch (IOException ioe) { LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); @@ -1289,6 +1301,18 @@ public class HLogSplitter { return ret; } + /** + * Update region's maximum edit log SeqNum. + */ + void updateRegionMaximumEditLogSeqNum(Entry entry) { + regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), + entry.getKey().getLogSeqNum()); + } + + Long getRegionMaximumEditLogSeqNum(byte[] region) { + return regionMaximumEditLogSeqNum.get(region); + } + /** * @return a map from encoded region ID to the number of edits written out * for that region. diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e2db6886c88..c0ac12ca5e3 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -54,9 +54,11 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -67,6 +69,7 @@ import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; @@ -136,6 +139,95 @@ public class TestHRegion extends HBaseTestCase { SchemaMetrics.validateMetricChanges(startingMetrics); } + public void testDataCorrectnessReplayingRecoveredEdits() throws Exception { + final int NUM_MASTERS = 1; + final int NUM_RS = 3; + TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + + try { + final byte[] TABLENAME = Bytes + .toBytes("testDataCorrectnessReplayingRecoveredEdits"); + final byte[] FAMILY = Bytes.toBytes("family"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + + // Create table + HTableDescriptor desc = new HTableDescriptor(TABLENAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + HBaseAdmin hbaseAdmin = TEST_UTIL.getHBaseAdmin(); + hbaseAdmin.createTable(desc); + + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + + // Put data: r1->v1 + HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + putDataAndVerify(table, "r1", FAMILY, "v1", 1); + + // Move region to target server + HRegionInfo regionInfo = table.getRegionLocation("r1").getRegionInfo(); + int originServerNum = cluster.getServerWith(regionInfo.getRegionName()); + HRegionServer originServer = cluster.getRegionServer(originServerNum); + int targetServerNum = NUM_RS - 1 - originServerNum; + HRegionServer targetServer = cluster.getRegionServer(targetServerNum); + hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(targetServer.getServerName().getServerName())); + do { + Thread.sleep(1); + } while (cluster.getServerWith(regionInfo.getRegionName()) == originServerNum); + + // Put data: r2->v2 + putDataAndVerify(table, "r2", FAMILY, "v2", 2); + + // Move region to origin server + hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(originServer.getServerName().getServerName())); + do { + Thread.sleep(1); + } while (cluster.getServerWith(regionInfo.getRegionName()) == targetServerNum); + + // Put data: r3->v3 + putDataAndVerify(table, "r3", FAMILY, "v3", 3); + + // Kill target server + targetServer.kill(); + cluster.getRegionServerThreads().get(targetServerNum).join(); + // Wait until finish processing of shutdown + while (master.getServerManager().areDeadServersInProgress()) { + Thread.sleep(5); + } + // Kill origin server + originServer.kill(); + cluster.getRegionServerThreads().get(originServerNum).join(); + + // Put data: r4->v4 + putDataAndVerify(table, "r4", FAMILY, "v4", 4); + + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private void putDataAndVerify(HTable table, String row, byte[] family, + String value, int verifyNum) throws IOException { + System.out.println("=========Putting data :" + row); + Put put = new Put(Bytes.toBytes(row)); + put.add(family, Bytes.toBytes("q1"), Bytes.toBytes(value)); + table.put(put); + ResultScanner resultScanner = table.getScanner(new Scan()); + List results = new ArrayList(); + while (true) { + Result r = resultScanner.next(); + if (r == null) + break; + results.add(r); + } + resultScanner.close(); + if (results.size() != verifyNum) { + System.out.println(results); + } + assertEquals(verifyNum, results.size()); + } + ////////////////////////////////////////////////////////////////////////////// // New tests that doesn't spin up a mini cluster but rather just test the // individual code pieces in the HRegion. Putting files locally in