HBASE-14949 Resolve name conflict when splitting if there are duplicated WAL entries

This commit is contained in:
zhangduo 2016-02-18 10:31:01 +08:00
parent 6f8c7dca13
commit d2ba87509b
2 changed files with 154 additions and 39 deletions

View File

@ -176,6 +176,10 @@ public class WALSplitter {
// Min batch size when replay WAL edits
private final int minBatchSize;
// the file being split currently
private FileStatus fileBeingSplit;
@VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker,
CoordinatedStateManager csm, RecoveryMode mode) {
@ -267,6 +271,7 @@ public class WALSplitter {
* log splitting implementation, splits one log file.
* @param logfile should be an actual log file.
*/
@VisibleForTesting
boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
Preconditions.checkState(status == null);
Preconditions.checkArgument(logfile.isFile(),
@ -285,6 +290,7 @@ public class WALSplitter {
TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "into a temporary staging area.");
Reader in = null;
this.fileBeingSplit = logfile;
try {
long logLength = logfile.getLen();
LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
@ -349,7 +355,7 @@ public class WALSplitter {
}
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) {
editsSkipped++;
continue;
}
@ -435,7 +441,7 @@ public class WALSplitter {
finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
}
static void finishSplitLogFile(Path rootdir, Path oldLogDir,
private static void finishSplitLogFile(Path rootdir, Path oldLogDir,
Path logPath, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
@ -509,12 +515,13 @@ public class WALSplitter {
* @param fs
* @param logEntry
* @param rootDir HBase root dir.
* @param fileBeingSplit the file being split currently. Used to generate tmp file name.
* @return Path to file into which to dump split log edits.
* @throws IOException
*/
@SuppressWarnings("deprecation")
static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir, boolean isCreate)
private static Path getRegionSplitEditsPath(final FileSystem fs,
final Entry logEntry, final Path rootDir, FileStatus fileBeingSplit)
throws IOException {
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
@ -542,17 +549,18 @@ public class WALSplitter {
}
}
if (isCreate && !fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
if (!fs.exists(dir) && !fs.mkdirs(dir)) {
LOG.warn("mkdir failed on " + dir);
}
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
fileName = getTmpRecoveredEditsFileName(fileName);
String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId());
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileBeingSplit.getPath().getName());
return new Path(dir, fileName);
}
static String getTmpRecoveredEditsFileName(String fileName) {
private static String getTmpRecoveredEditsFileName(String fileName) {
return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
}
@ -564,12 +572,13 @@ public class WALSplitter {
* @param maximumEditLogSeqNum
* @return dstPath take file's last edit log seq num as the name
*/
static Path getCompletedRecoveredEditsFilePath(Path srcPath,
Long maximumEditLogSeqNum) {
private static Path getCompletedRecoveredEditsFilePath(Path srcPath,
long maximumEditLogSeqNum) {
String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
return new Path(srcPath.getParent(), fileName);
}
@VisibleForTesting
static String formatRecoveredEditsFileName(final long seqid) {
return String.format("%019d", seqid);
}
@ -1175,9 +1184,9 @@ public class WALSplitter {
synchronized (regionMaximumEditLogSeqNum) {
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
.getEncodedRegionName());
if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
.getLogSeqNum());
.getSequenceId());
}
}
}
@ -1296,6 +1305,39 @@ public class WALSplitter {
return splits;
}
// delete the one with fewer wal entries
private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
long dstMinLogSeqNum = -1L;
try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
}
} catch (EOFException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?",
e);
}
}
if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
}
} else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ ", length=" + fs.getFileStatus(wap.p).getLen());
if (!fs.delete(wap.p, false)) {
LOG.warn("Failed deleting of " + wap.p);
throw new IOException("Failed deleting of " + wap.p);
}
}
}
/**
* Close all of the output streams.
* @return the list of paths written.
@ -1351,13 +1393,7 @@ public class WALSplitter {
regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
try {
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + dst + ", length="
+ fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
}
deleteOneWithFewerEntries(wap, dst);
}
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
@ -1482,7 +1518,7 @@ public class WALSplitter {
* @return a path with a write for that path. caller should close.
*/
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit);
if (regionedits == null) {
return null;
}
@ -1496,7 +1532,7 @@ public class WALSplitter {
}
Writer w = createWriter(regionedits);
LOG.debug("Creating writer path=" + regionedits);
return new WriterAndPath(regionedits, w);
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
}
private void filterCellByStore(Entry logEntry) {
@ -1516,7 +1552,7 @@ public class WALSplitter {
Long maxSeqId = maxSeqIdInStores.get(family);
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
// or the master was crashed before and we can not get the information.
if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
keptCells.add(cell);
}
}
@ -1623,10 +1659,12 @@ public class WALSplitter {
private final static class WriterAndPath extends SinkWriter {
final Path p;
final Writer w;
final long minLogSeqNum;
WriterAndPath(final Path p, final Writer w) {
WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) {
this.p = p;
this.w = w;
this.minLogSeqNum = minLogSeqNum;
}
}
@ -1819,7 +1857,7 @@ public class WALSplitter {
}
if (maxStoreSequenceIds != null) {
Long maxStoreSeqId = maxStoreSequenceIds.get(family);
if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getSequenceId()) {
// skip current kv if column family doesn't exist anymore or already flushed
skippedCells.add(cell);
continue;

View File

@ -33,6 +33,7 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@ -1034,6 +1035,56 @@ public class TestWALReplay {
assertEquals(result.size(), region2.get(g).size());
}
/**
* testcase for https://issues.apache.org/jira/browse/HBASE-14949.
*/
private void testNameConflictWhenSplit(boolean largeFirst) throws IOException {
final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL");
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName);
deleteDir(basedir);
final HTableDescriptor htd = createBasic1FamilyHTD(tableName);
HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd);
HBaseTestingUtility.closeRegionAndWAL(region);
final byte[] family = htd.getColumnFamilies()[0].getName();
final byte[] rowName = tableName.getName();
FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1);
FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2);
Path largeFile = new Path(logDir, "wal-1");
Path smallFile = new Path(logDir, "wal-2");
writerWALFile(largeFile, Arrays.asList(entry1, entry2));
writerWALFile(smallFile, Arrays.asList(entry2));
FileStatus first, second;
if (largeFirst) {
first = fs.getFileStatus(largeFile);
second = fs.getFileStatus(smallFile);
} else {
first = fs.getFileStatus(smallFile);
second = fs.getFileStatus(largeFile);
}
WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null,
RecoveryMode.LOG_SPLITTING, wals);
WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null,
RecoveryMode.LOG_SPLITTING, wals);
WAL wal = createWAL(this.conf);
region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal);
assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint());
assertEquals(2, region.get(new Get(rowName)).size());
}
@Test
public void testNameConflictWhenSplit0() throws IOException {
testNameConflictWhenSplit(true);
}
@Test
public void testNameConflictWhenSplit1() throws IOException {
testNameConflictWhenSplit(false);
}
static class MockWAL extends FSHLog {
boolean doCompleteCacheFlush = false;
@ -1102,27 +1153,42 @@ public class TestWALReplay {
}
}
private WALKey createWALKey(final TableName tableName, final HRegionInfo hri,
final MultiVersionConcurrencyControl mvcc) {
return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc);
}
private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee,
int index) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index));
byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes));
return edit;
}
private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence,
byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc,
int index) throws IOException {
FSWALEntry entry =
new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit(
rowName, family, ee, index), htd, hri, true);
entry.stampRegionSequenceId();
return entry;
}
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final WAL wal,
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc)
throws IOException {
String familyStr = Bytes.toString(family);
final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException {
for (int j = 0; j < count; j++) {
byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j));
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes,
ee.currentTime(), columnBytes));
wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc),
edit, true);
wal.append(htd, hri, createWALKey(tableName, hri, mvcc),
createWALEdit(rowName, family, ee, j), true);
}
wal.sync();
}
static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final Region r,
final String qualifierPrefix)
throws IOException {
static List<Put> addRegionEdits(final byte[] rowName, final byte[] family, final int count,
EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException {
List<Put> puts = new ArrayList<Put>();
for (int j = 0; j < count; j++) {
byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
@ -1183,4 +1249,15 @@ public class TestWALReplay {
htd.addFamily(c);
return htd;
}
private void writerWALFile(Path file, List<FSWALEntry> entries) throws IOException {
fs.mkdirs(file.getParent());
ProtobufLogWriter writer = new ProtobufLogWriter();
writer.init(fs, file, conf, true);
for (FSWALEntry entry : entries) {
writer.append(entry);
}
writer.sync();
writer.close();
}
}