HBASE-4862 Splitting hlog and opening region concurrently may cause data loss
(Chunhui Shen) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1206965 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2d2e75a7fb
commit
b1159115a4
|
@ -447,6 +447,8 @@ Release 0.92.0 - Unreleased
|
||||||
HBASE-4739 Master dying while going to close a region can leave it in transition
|
HBASE-4739 Master dying while going to close a region can leave it in transition
|
||||||
forever (Gao Jinchao)
|
forever (Gao Jinchao)
|
||||||
HBASE-4855 SplitLogManager hangs on cluster restart due to batch.installed doubly counted
|
HBASE-4855 SplitLogManager hangs on cluster restart due to batch.installed doubly counted
|
||||||
|
HBASE-4862 Splitting hlog and opening region concurrently may cause data loss
|
||||||
|
(Chunhui Shen)
|
||||||
|
|
||||||
TESTS
|
TESTS
|
||||||
HBASE-4450 test for number of blocks read: to serve as baseline for expected
|
HBASE-4450 test for number of blocks read: to serve as baseline for expected
|
||||||
|
|
|
@ -31,8 +31,8 @@ import java.net.URLEncoder;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
|
@ -126,6 +126,7 @@ public class HLog implements Syncable {
|
||||||
private static final String RECOVERED_EDITS_DIR = "recovered.edits";
|
private static final String RECOVERED_EDITS_DIR = "recovered.edits";
|
||||||
private static final Pattern EDITFILES_NAME_PATTERN =
|
private static final Pattern EDITFILES_NAME_PATTERN =
|
||||||
Pattern.compile("-?[0-9]+");
|
Pattern.compile("-?[0-9]+");
|
||||||
|
static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
|
||||||
|
|
||||||
private final FileSystem fs;
|
private final FileSystem fs;
|
||||||
private final Path dir;
|
private final Path dir;
|
||||||
|
@ -1703,7 +1704,8 @@ public class HLog implements Syncable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns sorted set of edit files made by wal-log splitter.
|
* Returns sorted set of edit files made by wal-log splitter, excluding files
|
||||||
|
* with '.temp' suffix.
|
||||||
* @param fs
|
* @param fs
|
||||||
* @param regiondir
|
* @param regiondir
|
||||||
* @return Files in passed <code>regiondir</code> as a sorted set.
|
* @return Files in passed <code>regiondir</code> as a sorted set.
|
||||||
|
@ -1726,6 +1728,11 @@ public class HLog implements Syncable {
|
||||||
// it a timestamp suffix. See moveAsideBadEditsFile.
|
// it a timestamp suffix. See moveAsideBadEditsFile.
|
||||||
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
|
Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
|
||||||
result = fs.isFile(p) && m.matches();
|
result = fs.isFile(p) && m.matches();
|
||||||
|
// Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX,
|
||||||
|
// because it means splithlog thread is writting this file.
|
||||||
|
if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
|
||||||
|
result = false;
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed isFile check on " + p);
|
LOG.warn("Failed isFile check on " + p);
|
||||||
}
|
}
|
||||||
|
|
|
@ -458,6 +458,24 @@ public class HLogSplitter {
|
||||||
WriterAndPath wap = (WriterAndPath)o;
|
WriterAndPath wap = (WriterAndPath)o;
|
||||||
wap.w.close();
|
wap.w.close();
|
||||||
LOG.debug("Closed " + wap.p);
|
LOG.debug("Closed " + wap.p);
|
||||||
|
Path dst = getCompletedRecoveredEditsFilePath(wap.p);
|
||||||
|
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||||
|
LOG.warn("Found existing old edits file. It could be the "
|
||||||
|
+ "result of a previous failed split attempt. Deleting " + dst
|
||||||
|
+ ", length=" + fs.getFileStatus(dst).getLen());
|
||||||
|
if (!fs.delete(dst, false)) {
|
||||||
|
LOG.warn("Failed deleting of old " + dst);
|
||||||
|
throw new IOException("Failed deleting of old " + dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Skip the unit tests which create a splitter that reads and writes the
|
||||||
|
// data without touching disk. TestHLogSplit#testThreading is an
|
||||||
|
// example.
|
||||||
|
if (fs.exists(wap.p)) {
|
||||||
|
if (!fs.rename(wap.p, dst)) {
|
||||||
|
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
String msg = ("processed " + editsCount + " edits across " + n + " regions" +
|
String msg = ("processed " + editsCount + " edits across " + n + " regions" +
|
||||||
" threw away edits for " + (logWriters.size() - n) + " regions" +
|
" threw away edits for " + (logWriters.size() - n) + " regions" +
|
||||||
|
@ -624,8 +642,30 @@ public class HLogSplitter {
|
||||||
if (isCreate && !fs.exists(dir)) {
|
if (isCreate && !fs.exists(dir)) {
|
||||||
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
|
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
|
||||||
}
|
}
|
||||||
return new Path(dir, formatRecoveredEditsFileName(logEntry.getKey()
|
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
||||||
.getLogSeqNum()));
|
// region's replayRecoveredEdits will not delete it
|
||||||
|
String fileName = formatRecoveredEditsFileName(logEntry.getKey()
|
||||||
|
.getLogSeqNum());
|
||||||
|
fileName = getTmpRecoveredEditsFileName(fileName);
|
||||||
|
return new Path(dir, fileName);
|
||||||
|
}
|
||||||
|
|
||||||
|
static String getTmpRecoveredEditsFileName(String fileName) {
|
||||||
|
return fileName + HLog.RECOVERED_LOG_TMPFILE_SUFFIX;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert path to a file under RECOVERED_EDITS_DIR directory without
|
||||||
|
* RECOVERED_LOG_TMPFILE_SUFFIX
|
||||||
|
* @param srcPath
|
||||||
|
* @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX
|
||||||
|
*/
|
||||||
|
static Path getCompletedRecoveredEditsFilePath(Path srcPath) {
|
||||||
|
String fileName = srcPath.getName();
|
||||||
|
if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) {
|
||||||
|
fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0];
|
||||||
|
}
|
||||||
|
return new Path(srcPath.getParent(), fileName);
|
||||||
}
|
}
|
||||||
|
|
||||||
static String formatRecoveredEditsFileName(final long seqid) {
|
static String formatRecoveredEditsFileName(final long seqid) {
|
||||||
|
@ -1136,9 +1176,33 @@ public class HLogSplitter {
|
||||||
thrown.add(ioe);
|
thrown.add(ioe);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
paths.add(wap.p);
|
|
||||||
LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
|
LOG.info("Closed path " + wap.p +" (wrote " + wap.editsWritten + " edits in "
|
||||||
+ (wap.nanosSpent / 1000/ 1000) + "ms)");
|
+ (wap.nanosSpent / 1000/ 1000) + "ms)");
|
||||||
|
Path dst = getCompletedRecoveredEditsFilePath(wap.p);
|
||||||
|
try {
|
||||||
|
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||||
|
LOG.warn("Found existing old edits file. It could be the "
|
||||||
|
+ "result of a previous failed split attempt. Deleting " + dst
|
||||||
|
+ ", length=" + fs.getFileStatus(dst).getLen());
|
||||||
|
if (!fs.delete(dst, false)) {
|
||||||
|
LOG.warn("Failed deleting of old " + dst);
|
||||||
|
throw new IOException("Failed deleting of old " + dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Skip the unit tests which create a splitter that reads and writes
|
||||||
|
// the data without touching disk. TestHLogSplit#testThreading is an
|
||||||
|
// example.
|
||||||
|
if (fs.exists(wap.p)) {
|
||||||
|
if (!fs.rename(wap.p, dst)) {
|
||||||
|
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
||||||
|
thrown.add(ioe);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
paths.add(dst);
|
||||||
}
|
}
|
||||||
if (!thrown.isEmpty()) {
|
if (!thrown.isEmpty()) {
|
||||||
throw MultipleIOException.createIOException(thrown);
|
throw MultipleIOException.createIOException(thrown);
|
||||||
|
|
|
@ -19,7 +19,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -27,6 +31,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.NavigableSet;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
@ -39,10 +44,16 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.*;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
@ -1103,6 +1114,58 @@ public class TestHLogSplit {
|
||||||
assertEquals(1, fs.listStatus(corruptDir).length);
|
assertEquals(1, fs.listStatus(corruptDir).length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @throws IOException
|
||||||
|
* @see https://issues.apache.org/jira/browse/HBASE-4862
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testConcurrentSplitLogAndReplayRecoverEdit() throws IOException {
|
||||||
|
LOG.info("testConcurrentSplitLogAndReplayRecoverEdit");
|
||||||
|
// Generate hlogs for our destination region
|
||||||
|
String regionName = "r0";
|
||||||
|
final Path regiondir = new Path(tabledir, regionName);
|
||||||
|
regions = new ArrayList<String>();
|
||||||
|
regions.add(regionName);
|
||||||
|
generateHLogs(-1);
|
||||||
|
|
||||||
|
HLogSplitter logSplitter = new HLogSplitter(
|
||||||
|
conf, hbaseDir, hlogDir, oldLogDir, fs) {
|
||||||
|
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
HLog.Writer writer = HLog.createWriter(fs, logfile, conf);
|
||||||
|
// After creating writer, simulate region's
|
||||||
|
// replayRecoveredEditsIfAny() which gets SplitEditFiles of this
|
||||||
|
// region and delete them, excluding files with '.temp' suffix.
|
||||||
|
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs,
|
||||||
|
regiondir);
|
||||||
|
if (files != null && !files.isEmpty()) {
|
||||||
|
for (Path file : files) {
|
||||||
|
if (!this.fs.delete(file, false)) {
|
||||||
|
LOG.error("Failed delete of " + file);
|
||||||
|
} else {
|
||||||
|
LOG.debug("Deleted recovered.edits file=" + file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return writer;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try{
|
||||||
|
logSplitter.splitLog();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info(e);
|
||||||
|
Assert.fail("Throws IOException when spliting "
|
||||||
|
+ "log, it is most likely because writing file does not "
|
||||||
|
+ "exist which is caused by concurrent replayRecoveredEditsIfAny()");
|
||||||
|
}
|
||||||
|
if (fs.exists(corruptDir)) {
|
||||||
|
if (fs.listStatus(corruptDir).length > 0) {
|
||||||
|
Assert.fail("There are some corrupt logs, "
|
||||||
|
+ "it is most likely caused by concurrent replayRecoveredEditsIfAny()");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void flushToConsole(String s) {
|
private void flushToConsole(String s) {
|
||||||
System.out.println(s);
|
System.out.println(s);
|
||||||
System.out.flush();
|
System.out.flush();
|
||||||
|
|
Loading…
Reference in New Issue