diff --git a/CHANGES.txt b/CHANGES.txt
index aef39414613..efa51e80c96 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -447,6 +447,8 @@ Release 0.92.0 - Unreleased
HBASE-4739 Master dying while going to close a region can leave it in transition
forever (Gao Jinchao)
HBASE-4855 SplitLogManager hangs on cluster restart due to batch.installed doubly counted
+ HBASE-4862 Splitting hlog and opening region concurrently may cause data loss
+ (Chunhui Shen)
TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
index 1823947fe13..1592f0fc815 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
@@ -31,8 +31,8 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedMap;
@@ -126,6 +126,7 @@ public class HLog implements Syncable {
private static final String RECOVERED_EDITS_DIR = "recovered.edits";
private static final Pattern EDITFILES_NAME_PATTERN =
Pattern.compile("-?[0-9]+");
+ static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
private final FileSystem fs;
private final Path dir;
@@ -1703,7 +1704,8 @@ public class HLog implements Syncable {
}
/**
- * Returns sorted set of edit files made by wal-log splitter.
+ * Returns sorted set of edit files made by wal-log splitter, excluding files
+ * with '.temp' suffix.
* @param fs
* @param regiondir
* @return Files in passed regiondir
as a sorted set.
@@ -1726,6 +1728,11 @@ public class HLog implements Syncable {
// it a timestamp suffix. See moveAsideBadEditsFile.
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
result = fs.isFile(p) && m.matches();
+ // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
+ // because it means splithlog thread is writting this file.
+ if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
+ result = false;
+ }
} catch (IOException e) {
LOG.warn("Failed isFile check on " + p);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
index 21747b105a8..e9c0623747c 100644
--- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
+++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
@@ -458,6 +458,24 @@ public class HLogSplitter {
WriterAndPath wap = (WriterAndPath)o;
wap.w.close();
LOG.debug("Closed " + wap.p);
+ Path dst = getCompletedRecoveredEditsFilePath(wap.p);
+ if (!dst.equals(wap.p) && fs.exists(dst)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + dst
+ + ", length=" + fs.getFileStatus(dst).getLen());
+ if (!fs.delete(dst, false)) {
+ LOG.warn("Failed deleting of old " + dst);
+ throw new IOException("Failed deleting of old " + dst);
+ }
+ }
+ // Skip the unit tests which create a splitter that reads and writes the
+ // data without touching disk. TestHLogSplit#testThreading is an
+ // example.
+ if (fs.exists(wap.p)) {
+ if (!fs.rename(wap.p, dst)) {
+ throw new IOException("Failed renaming " + wap.p + " to " + dst);
+ }
+ }
}
String msg = ("processed " + editsCount + " edits across " + n + " regions" +
" threw away edits for " + (logWriters.size() - n) + " regions" +
@@ -624,8 +642,30 @@ public class HLogSplitter {
if (isCreate && !fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
}
- return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
- .getLogSeqNum()));
+ // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
+ // region's replayRecoveredEdits will not delete it
+ String fileName = formatRecoveredEditsFileName(logEntry.getKey()
+ .getLogSeqNum());
+ fileName = getTmpRecoveredEditsFileName(fileName);
+ return new Path(dir, fileName);
+ }
+
+ static String getTmpRecoveredEditsFileName(String fileName) {
+ return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
+ }
+
+ /**
+ * Convert path to a file under RECOVERED_EDITS_DIR directory without
+ * RECOVERED_LOG_TMPFILE_SUFFIX
+ * @param srcPath
+ * @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX
+ */
+ static Path getCompletedRecoveredEditsFilePath(Path srcPath) {
+ String fileName = srcPath.getName();
+ if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
+ fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0];
+ }
+ return new Path(srcPath.getParent(), fileName);
}
static String formatRecoveredEditsFileName(final long seqid) {
@@ -1136,9 +1176,33 @@ public class HLogSplitter {
thrown.add(ioe);
continue;
}
- paths.add(wap.p);
LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
+ (wap.nanosSpent / 1000/ 1000) + "ms)");
+ Path dst = getCompletedRecoveredEditsFilePath(wap.p);
+ try {
+ if (!dst.equals(wap.p) && fs.exists(dst)) {
+ LOG.warn("Found existing old edits file. It could be the "
+ + "result of a previous failed split attempt. Deleting " + dst
+ + ", length=" + fs.getFileStatus(dst).getLen());
+ if (!fs.delete(dst, false)) {
+ LOG.warn("Failed deleting of old " + dst);
+ throw new IOException("Failed deleting of old " + dst);
+ }
+ }
+ // Skip the unit tests which create a splitter that reads and writes
+ // the data without touching disk. TestHLogSplit#testThreading is an
+ // example.
+ if (fs.exists(wap.p)) {
+ if (!fs.rename(wap.p, dst)) {
+ throw new IOException("Failed renaming " + wap.p + " to " + dst);
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
+ thrown.add(ioe);
+ continue;
+ }
+ paths.add(dst);
}
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
index 1041472cb60..eb67a01b590 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
@@ -19,7 +19,11 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -27,6 +31,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,10 +44,16 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.Threads;
@@ -1103,6 +1114,58 @@ public class TestHLogSplit {
assertEquals(1, fs.listStatus(corruptDir).length);
}
+ /**
+ * @throws IOException
+ * @see https://issues.apache.org/jira/browse/HBASE-4862
+ */
+ @Test
+ public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
+ LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
+ // Generate hlogs for our destination region
+ String regionName = "r0";
+ final Path regiondir = new Path(tabledir, regionName);
+ regions = new ArrayList();
+ regions.add(regionName);
+ generateHLogs(-1);
+
+ HLogSplitter logSplitter = new HLogSplitter(
+ conf, hbaseDir, hlogDir, oldLogDir, fs) {
+ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
+ throws IOException {
+ HLog.Writer writer = HLog.createWriter(fs, logfile, conf);
+ // After creating writer, simulate region's
+ // replayRecoveredEditsIfAny() which gets SplitEditFiles of this
+ // region and delete them, excluding files with '.temp' suffix.
+ NavigableSet files = HLog.getSplitEditFilesSorted(this.fs,
+ regiondir);
+ if (files != null && !files.isEmpty()) {
+ for (Path file : files) {
+ if (!this.fs.delete(file, false)) {
+ LOG.error("Failed delete of " + file);
+ } else {
+ LOG.debug("Deleted recovered.edits file=" + file);
+ }
+ }
+ }
+ return writer;
+ }
+ };
+ try{
+ logSplitter.splitLog();
+ } catch (IOException e) {
+ LOG.info(e);
+ Assert.fail("Throws IOException when spliting "
+ + "log, it is most likely because writing file does not "
+ + "exist which is caused by concurrent replayRecoveredEditsIfAny()");
+ }
+ if (fs.exists(corruptDir)) {
+ if (fs.listStatus(corruptDir).length > 0) {
+ Assert.fail("There are some corrupt logs, "
+ + "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
+ }
+ }
+ }
+
private void flushToConsole(String s) {
System.out.println(s);
System.out.flush();