diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c49007d6b27..3bb0a91624c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5436,7 +5436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } private long loadRecoveredHFilesIfAny(Collection stores) throws IOException { - Path regionDir = getWALRegionDir(); + Path regionDir = fs.getRegionDir(); long maxSeqId = -1; for (HStore store : stores) { String familyName = store.getColumnFamilyName(); @@ -5449,17 +5449,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (isZeroLengthThenDelete(fs.getFileSystem(), file, filePath)) { continue; } - try { - store.assertBulkLoadHFileOk(filePath); + HStoreFile storefile = store.tryCommitRecoveredHFile(file.getPath()); + maxSeqId = Math.max(maxSeqId, storefile.getReader().getSequenceID()); } catch (IOException e) { handleException(fs.getFileSystem(), filePath, e); continue; } - Pair pair = store.preBulkLoadHFile(filePath.toString(), -1); - store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(), - pair.getSecond()); - maxSeqId = Math.max(maxSeqId, WALSplitUtil.getSeqIdForRecoveredHFile(filePath.getName())); } if (this.rsServices != null && store.needsCompaction()) { this.rsServices.getCompactionRequestor() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 1c376f96409..ca74fc22fa6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1086,6 +1086,42 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, throw lastException; } + public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException { + LOG.info("Validating recovered hfile at {} for inclusion in store {} region {}", path, this, + getRegionInfo().getRegionNameAsString()); + FileSystem srcFs = path.getFileSystem(conf); + srcFs.access(path, FsAction.READ_WRITE); + try (HFile.Reader reader = + HFile.createReader(srcFs, path, cacheConf, isPrimaryReplicaStore(), conf)) { + Optional firstKey = reader.getFirstRowKey(); + Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); + Optional lk = reader.getLastKey(); + Preconditions.checkState(lk.isPresent(), "Last key can not be null"); + byte[] lastKey = CellUtil.cloneRow(lk.get()); + if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { + throw new WrongRegionException("Recovered hfile " + path.toString() + + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); + } + } + + Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); + HStoreFile sf = createStoreFileAndReader(dstPath); + StoreFileReader r = sf.getReader(); + this.storeSize.addAndGet(r.length()); + this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes()); + + this.lock.writeLock().lock(); + try { + this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); + } finally { + this.lock.writeLock().unlock(); + } + + LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf, + r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1)); + return sf; + } + /** * @param path The pathname of the tmp file into which the store was flushed * @return store file created. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java index accfe56bfad..ce97d64c728 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredHFilesOutputSink.java @@ -32,15 +32,17 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.regionserver.CellSet; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer; @@ -85,11 +87,14 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { continue; } + PrivateCellUtil.setSequenceId(cell, seqId); String familyName = Bytes.toString(CellUtil.cloneFamily(cell)); // comparator need to be specified for meta - familyCells.computeIfAbsent(familyName, key -> new CellSet( - isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparator.getInstance())) - .add(cell); + familyCells + .computeIfAbsent(familyName, + key -> new CellSet( + isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR)) + .add(cell); familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId)); } } @@ -105,6 +110,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { for (Cell cell : cellsEntry.getValue()) { writer.append(cell); } + // Append the max seqid to hfile, used when recovery. + writer.appendMetadata(familySeqIds.get(familyName), false); regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName), (k, v) -> v == null ? buffer.entryBuffer.size() : v + buffer.entryBuffer.size()); splits.add(writer.getPath()); @@ -181,44 +188,32 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink { private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName, long seqId, String familyName, boolean isMetaTable) throws IOException { - Path outputFile = WALSplitUtil - .getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId, - walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, walSplitter.rootFS); - checkPathValid(outputFile); + Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf, + tableName, regionName, familyName); StoreFileWriter.Builder writerBuilder = new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS) - .withFilePath(outputFile); - HFileContextBuilder hFileContextBuilder = new HFileContextBuilder(); - if (isMetaTable) { - hFileContextBuilder.withCellComparator(CellComparatorImpl.META_COMPARATOR); - } else { - configContextForNonMetaWriter(tableName, familyName, hFileContextBuilder, writerBuilder); - } - return writerBuilder.withFileContext(hFileContextBuilder.build()).build(); - } + .withOutputDir(outputDir); - private void configContextForNonMetaWriter(TableName tableName, String familyName, - HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder) - throws IOException { TableDescriptor tableDesc = tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t)); if (tableDesc == null) { throw new IOException("Failed to get table descriptor for table " + tableName); } ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName)); - hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize()) - .withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding()) - .withCellComparator(CellComparatorImpl.COMPARATOR); - writerBuilder.withBloomType(cfd.getBloomFilterType()); + HFileContext hFileContext = createFileContext(cfd, isMetaTable); + return writerBuilder.withFileContext(hFileContext).withBloomType(cfd.getBloomFilterType()) + .build(); } - private void checkPathValid(Path outputFile) throws IOException { - if (walSplitter.rootFS.exists(outputFile)) { - LOG.warn("this file {} may be left after last failed split ", outputFile); - if (!walSplitter.rootFS.delete(outputFile, false)) { - LOG.warn("delete old generated HFile {} failed", outputFile); - } - } + private HFileContext createFileContext(ColumnFamilyDescriptor cfd, boolean isMetaTable) + throws IOException { + return new HFileContextBuilder().withCompression(cfd.getCompressionType()) + .withChecksumType(HStore.getChecksumType(walSplitter.conf)) + .withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf)) + .withBlockSize(cfd.getBlocksize()).withCompressTags(cfd.isCompressTags()) + .withDataBlockEncoding(cfd.getDataBlockEncoding()).withCellComparator( + isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR) + .build(); } private TableDescriptor getTableDescriptor(TableName tableName) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index 3ff1e70ad71..88bae5657ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -566,42 +566,29 @@ public final class WALSplitUtil { } /** - * Path to a file under recovered.hfiles directory of the region's column family: e.g. - * /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method also ensures existence - * of recovered.hfiles directory under the region's column family, creating it if necessary. - * - * @param tableName the table name - * @param encodedRegionName the encoded region name - * @param familyName the column family name - * @param seqId the sequence id which used to generate file name + * Return path to recovered.hfiles directory of the region's column family: e.g. + * /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of + * recovered.hfiles directory under the region's column family, creating it if necessary. + * @param rootFS the root file system + * @param conf configuration + * @param tableName the table name + * @param encodedRegionName the encoded region name + * @param familyName the column family name + * @param seqId the sequence id which used to generate file name * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name - * @param conf configuration - * @param rootFS the root file system - * @return Path to file into which to dump split log edits. + * @return Path to recovered.hfiles directory of the region's column family. */ - static Path getRegionRecoveredHFilePath(TableName tableName, String encodedRegionName, - String familyName, long seqId, String fileNameBeingSplit, Configuration conf, FileSystem rootFS) - throws IOException { + static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf, + TableName tableName, String encodedRegionName, String familyName) throws IOException { Path rootDir = FSUtils.getRootDir(conf); - Path regionDir = - FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), encodedRegionName); - Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName); - + Path regionDir = FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), + encodedRegionName); + Path dir = getRecoveredHFilesDir(regionDir, familyName); if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) { LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName, familyName); } - - String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit); - return new Path(dir, fileName); - } - - private static String formatRecoveredHFileName(long seqId, String fileNameBeingSplit) { - return String.format("%019d", seqId) + "-" + fileNameBeingSplit; - } - - public static long getSeqIdForRecoveredHFile(String fileName) { - return Long.parseLong(fileName.split("-")[0]); + return dir; } /** @@ -609,13 +596,13 @@ public final class WALSplitUtil { * @param familyName The column family name * @return The directory that holds recovered hfiles for the region's column family */ - private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, String familyName) { + private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) { return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR); } public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS, final Path regionDir, String familyName) throws IOException { - Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName); + Path dir = getRecoveredHFilesDir(regionDir, familyName); return FSUtils.listStatus(rootFS, dir); } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java index a7fb7317cc0..5d762dce161 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitToHFile.java @@ -30,7 +30,9 @@ import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -252,6 +254,92 @@ public class TestWALSplitToHFile { } } + @Test + public void testPutWithSameTimestamp() throws Exception { + Pair pair = setupTableAndRegion(); + TableDescriptor td = pair.getFirst(); + RegionInfo ri = pair.getSecond(); + + WAL wal = createWAL(this.conf, rootDir, logName); + HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); + final long timestamp = this.ee.currentTime(); + // Write data and flush + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1)); + } + region.flush(true); + + // Now assert edits made it in. + Result result1 = region.get(new Get(ROW)); + assertEquals(td.getColumnFamilies().length, result1.size()); + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x")))); + } + + // Write data with same timestamp and do not flush + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE2)); + } + // Now close the region (without flush) + region.close(true); + wal.shutdown(); + // split the log + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + + // reopen the region + WAL wal2 = createWAL(this.conf, rootDir, logName); + HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); + Result result2 = region2.get(new Get(ROW)); + assertEquals(td.getColumnFamilies().length, result2.size()); + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), Bytes.toBytes("x")))); + } + } + + @Test + public void testRecoverSequenceId() throws Exception { + Pair pair = setupTableAndRegion(); + TableDescriptor td = pair.getFirst(); + RegionInfo ri = pair.getSecond(); + + WAL wal = createWAL(this.conf, rootDir, logName); + HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal); + Map> seqIdMap = new HashMap<>(); + // Write data and do not flush + for (int i = 0; i < countPerFamily; i++) { + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), Bytes.toBytes("x"), VALUE1)); + Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); + assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x")))); + List cells = result.listCells(); + assertEquals(1, cells.size()); + seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(), + cells.get(0).getSequenceId()); + } + } + + // Now close the region (without flush) + region.close(true); + wal.shutdown(); + // split the log + WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals); + + // reopen the region + WAL wal2 = createWAL(this.conf, rootDir, logName); + HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2); + // assert the seqid was recovered + for (int i = 0; i < countPerFamily; i++) { + for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) { + Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName())); + assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x")))); + List cells = result.listCells(); + assertEquals(1, cells.size()); + assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()), + cells.get(0).getSequenceId()); + } + } + } + /** * Test writing edits into an HRegion, closing it, splitting logs, opening * Region again. Verify seqids.