diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 8abd950662a..54b82b2598d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -176,6 +176,10 @@ public class WALSplitter { // Min batch size when replay WAL edits private final int minBatchSize; + // the file being split currently + private FileStatus fileBeingSplit; + + @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, CoordinatedStateManager csm, RecoveryMode mode) { @@ -267,6 +271,7 @@ public class WALSplitter { * log splitting implementation, splits one log file. * @param logfile should be an actual log file. */ + @VisibleForTesting boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException { Preconditions.checkState(status == null); Preconditions.checkArgument(logfile.isFile(), @@ -285,6 +290,7 @@ public class WALSplitter { TaskMonitor.get().createStatus( "Splitting log file " + logfile.getPath() + "into a temporary staging area."); Reader in = null; + this.fileBeingSplit = logfile; try { long logLength = logfile.getLen(); LOG.info("Splitting wal: " + logPath + ", length=" + logLength); @@ -349,7 +355,7 @@ public class WALSplitter { } lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); } - if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { + if (lastFlushedSequenceId >= entry.getKey().getSequenceId()) { editsSkipped++; continue; } @@ -435,7 +441,7 @@ public class WALSplitter { finishSplitLogFile(rootdir, oldLogDir, logPath, conf); } - static void finishSplitLogFile(Path rootdir, Path oldLogDir, + private static void finishSplitLogFile(Path rootdir, Path oldLogDir, Path logPath, Configuration conf) throws IOException { List processedLogs = new ArrayList(); List corruptedLogs = new ArrayList(); @@ -509,12 +515,13 @@ public class WALSplitter { * @param fs * @param logEntry * @param rootDir HBase root dir. + * @param fileBeingSplit the file being split currently. Used to generate tmp file name. * @return Path to file into which to dump split log edits. * @throws IOException */ @SuppressWarnings("deprecation") - static Path getRegionSplitEditsPath(final FileSystem fs, - final Entry logEntry, final Path rootDir, boolean isCreate) + private static Path getRegionSplitEditsPath(final FileSystem fs, + final Entry logEntry, final Path rootDir, FileStatus fileBeingSplit) throws IOException { Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); @@ -542,17 +549,18 @@ public class WALSplitter { } } - if (isCreate && !fs.exists(dir)) { - if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); + if (!fs.exists(dir) && !fs.mkdirs(dir)) { + LOG.warn("mkdir failed on " + dir); } + // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure // region's replayRecoveredEdits will not delete it - String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum()); - fileName = getTmpRecoveredEditsFileName(fileName); + String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId()); + fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileBeingSplit.getPath().getName()); return new Path(dir, fileName); } - static String getTmpRecoveredEditsFileName(String fileName) { + private static String getTmpRecoveredEditsFileName(String fileName) { return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; } @@ -564,12 +572,13 @@ public class WALSplitter { * @param maximumEditLogSeqNum * @return dstPath take file's last edit log seq num as the name */ - static Path getCompletedRecoveredEditsFilePath(Path srcPath, - Long maximumEditLogSeqNum) { + private static Path getCompletedRecoveredEditsFilePath(Path srcPath, + long maximumEditLogSeqNum) { String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); return new Path(srcPath.getParent(), fileName); } + @VisibleForTesting static String formatRecoveredEditsFileName(final long seqid) { return String.format("%019d", seqid); } @@ -1175,9 +1184,9 @@ public class WALSplitter { synchronized (regionMaximumEditLogSeqNum) { Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey() .getEncodedRegionName()); - if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) { + if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey() - .getLogSeqNum()); + .getSequenceId()); } } } @@ -1296,6 +1305,39 @@ public class WALSplitter { return splits; } + // delete the one with fewer wal entries + private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { + long dstMinLogSeqNum = -1L; + try (WAL.Reader reader = walFactory.createReader(fs, dst)) { + WAL.Entry entry = reader.next(); + if (entry != null) { + dstMinLogSeqNum = entry.getKey().getSequenceId(); + } + } catch (EOFException e) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?", + e); + } + } + if (wap.minLogSeqNum < dstMinLogSeqNum) { + LOG.warn("Found existing old edits file. It could be the result of a previous failed" + + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + + fs.getFileStatus(dst).getLen()); + if (!fs.delete(dst, false)) { + LOG.warn("Failed deleting of old " + dst); + throw new IOException("Failed deleting of old " + dst); + } + } else { + LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p + + ", length=" + fs.getFileStatus(wap.p).getLen()); + if (!fs.delete(wap.p, false)) { + LOG.warn("Failed deleting of " + wap.p); + throw new IOException("Failed deleting of " + wap.p); + } + } + } + /** * Close all of the output streams. * @return the list of paths written. @@ -1351,13 +1393,7 @@ public class WALSplitter { regionMaximumEditLogSeqNum.get(writersEntry.getKey())); try { if (!dst.equals(wap.p) && fs.exists(dst)) { - LOG.warn("Found existing old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + dst + ", length=" - + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); - throw new IOException("Failed deleting of old " + dst); - } + deleteOneWithFewerEntries(wap, dst); } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. @@ -1482,7 +1518,7 @@ public class WALSplitter { * @return a path with a write for that path. caller should close. */ private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { - Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true); + Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit); if (regionedits == null) { return null; } @@ -1496,7 +1532,7 @@ public class WALSplitter { } Writer w = createWriter(regionedits); LOG.debug("Creating writer path=" + regionedits); - return new WriterAndPath(regionedits, w); + return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); } private void filterCellByStore(Entry logEntry) { @@ -1516,7 +1552,7 @@ public class WALSplitter { Long maxSeqId = maxSeqIdInStores.get(family); // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, // or the master was crashed before and we can not get the information. - if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) { + if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { keptCells.add(cell); } } @@ -1623,10 +1659,12 @@ public class WALSplitter { private final static class WriterAndPath extends SinkWriter { final Path p; final Writer w; + final long minLogSeqNum; - WriterAndPath(final Path p, final Writer w) { + WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) { this.p = p; this.w = w; + this.minLogSeqNum = minLogSeqNum; } } @@ -1819,7 +1857,7 @@ public class WALSplitter { } if (maxStoreSequenceIds != null) { Long maxStoreSeqId = maxStoreSequenceIds.get(family); - if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) { + if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getSequenceId()) { // skip current kv if column family doesn't exist anymore or already flushed skippedCells.add(cell); continue; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 40e5baa22eb..dbc06ff2ff4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -1034,6 +1035,56 @@ public class TestWALReplay { assertEquals(result.size(), region2.get(g).size()); } + /** + * testcase for https://issues.apache.org/jira/browse/HBASE-14949. + */ + private void testNameConflictWhenSplit(boolean largeFirst) throws IOException { + final TableName tableName = TableName.valueOf("testReplayEditsWrittenIntoWAL"); + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName); + final Path basedir = FSUtils.getTableDir(hbaseRootDir, tableName); + deleteDir(basedir); + + final HTableDescriptor htd = createBasic1FamilyHTD(tableName); + HRegion region = HBaseTestingUtility.createRegionAndWAL(hri, hbaseRootDir, this.conf, htd); + HBaseTestingUtility.closeRegionAndWAL(region); + final byte[] family = htd.getColumnFamilies()[0].getName(); + final byte[] rowName = tableName.getName(); + FSWALEntry entry1 = createFSWALEntry(htd, hri, 1L, rowName, family, ee, mvcc, 1); + FSWALEntry entry2 = createFSWALEntry(htd, hri, 2L, rowName, family, ee, mvcc, 2); + + Path largeFile = new Path(logDir, "wal-1"); + Path smallFile = new Path(logDir, "wal-2"); + writerWALFile(largeFile, Arrays.asList(entry1, entry2)); + writerWALFile(smallFile, Arrays.asList(entry2)); + FileStatus first, second; + if (largeFirst) { + first = fs.getFileStatus(largeFile); + second = fs.getFileStatus(smallFile); + } else { + first = fs.getFileStatus(smallFile); + second = fs.getFileStatus(largeFile); + } + WALSplitter.splitLogFile(hbaseRootDir, first, fs, conf, null, null, null, + RecoveryMode.LOG_SPLITTING, wals); + WALSplitter.splitLogFile(hbaseRootDir, second, fs, conf, null, null, null, + RecoveryMode.LOG_SPLITTING, wals); + WAL wal = createWAL(this.conf); + region = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal); + assertTrue(region.getOpenSeqNum() > mvcc.getWritePoint()); + assertEquals(2, region.get(new Get(rowName)).size()); + } + + @Test + public void testNameConflictWhenSplit0() throws IOException { + testNameConflictWhenSplit(true); + } + + @Test + public void testNameConflictWhenSplit1() throws IOException { + testNameConflictWhenSplit(false); + } + static class MockWAL extends FSHLog { boolean doCompleteCacheFlush = false; @@ -1102,27 +1153,42 @@ public class TestWALReplay { } } + private WALKey createWALKey(final TableName tableName, final HRegionInfo hri, + final MultiVersionConcurrencyControl mvcc) { + return new WALKey(hri.getEncodedNameAsBytes(), tableName, 999, mvcc); + } + + private WALEdit createWALEdit(final byte[] rowName, final byte[] family, EnvironmentEdge ee, + int index) { + byte[] qualifierBytes = Bytes.toBytes(Integer.toString(index)); + byte[] columnBytes = Bytes.toBytes(Bytes.toString(family) + ":" + Integer.toString(index)); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); + return edit; + } + + private FSWALEntry createFSWALEntry(HTableDescriptor htd, HRegionInfo hri, long sequence, + byte[] rowName, byte[] family, EnvironmentEdge ee, MultiVersionConcurrencyControl mvcc, + int index) throws IOException { + FSWALEntry entry = + new FSWALEntry(sequence, createWALKey(htd.getTableName(), hri, mvcc), createWALEdit( + rowName, family, ee, index), htd, hri, true); + entry.stampRegionSequenceId(); + return entry; + } + private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, - final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) - throws IOException { - String familyStr = Bytes.toString(family); + final HTableDescriptor htd, final MultiVersionConcurrencyControl mvcc) throws IOException { for (int j = 0; j < count; j++) { - byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); - byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, qualifierBytes, - ee.currentTime(), columnBytes)); - wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName,999, mvcc), - edit, true); + wal.append(htd, hri, createWALKey(tableName, hri, mvcc), + createWALEdit(rowName, family, ee, j), true); } wal.sync(); } - static List addRegionEdits (final byte [] rowName, final byte [] family, - final int count, EnvironmentEdge ee, final Region r, - final String qualifierPrefix) - throws IOException { + static List addRegionEdits(final byte[] rowName, final byte[] family, final int count, + EnvironmentEdge ee, final Region r, final String qualifierPrefix) throws IOException { List puts = new ArrayList(); for (int j = 0; j < count; j++) { byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j)); @@ -1183,4 +1249,15 @@ public class TestWALReplay { htd.addFamily(c); return htd; } + + private void writerWALFile(Path file, List entries) throws IOException { + fs.mkdirs(file.getParent()); + ProtobufLogWriter writer = new ProtobufLogWriter(); + writer.init(fs, file, conf, true); + for (FSWALEntry entry : entries) { + writer.append(entry); + } + writer.sync(); + writer.close(); + } }