HBASE-23741 Data loss when WAL split to HFile enabled (#1254)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
eface74407
commit
8130858199
|
@ -5436,7 +5436,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
}
|
}
|
||||||
|
|
||||||
private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOException {
|
private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOException {
|
||||||
Path regionDir = getWALRegionDir();
|
Path regionDir = fs.getRegionDir();
|
||||||
long maxSeqId = -1;
|
long maxSeqId = -1;
|
||||||
for (HStore store : stores) {
|
for (HStore store : stores) {
|
||||||
String familyName = store.getColumnFamilyName();
|
String familyName = store.getColumnFamilyName();
|
||||||
|
@ -5449,17 +5449,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (isZeroLengthThenDelete(fs.getFileSystem(), file, filePath)) {
|
if (isZeroLengthThenDelete(fs.getFileSystem(), file, filePath)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
store.assertBulkLoadHFileOk(filePath);
|
HStoreFile storefile = store.tryCommitRecoveredHFile(file.getPath());
|
||||||
|
maxSeqId = Math.max(maxSeqId, storefile.getReader().getSequenceID());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
handleException(fs.getFileSystem(), filePath, e);
|
handleException(fs.getFileSystem(), filePath, e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Pair<Path, Path> pair = store.preBulkLoadHFile(filePath.toString(), -1);
|
|
||||||
store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
|
|
||||||
pair.getSecond());
|
|
||||||
maxSeqId = Math.max(maxSeqId, WALSplitUtil.getSeqIdForRecoveredHFile(filePath.getName()));
|
|
||||||
}
|
}
|
||||||
if (this.rsServices != null && store.needsCompaction()) {
|
if (this.rsServices != null && store.needsCompaction()) {
|
||||||
this.rsServices.getCompactionRequestor()
|
this.rsServices.getCompactionRequestor()
|
||||||
|
|
|
@ -1086,6 +1086,42 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
||||||
throw lastException;
|
throw lastException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HStoreFile tryCommitRecoveredHFile(Path path) throws IOException {
|
||||||
|
LOG.info("Validating recovered hfile at {} for inclusion in store {} region {}", path, this,
|
||||||
|
getRegionInfo().getRegionNameAsString());
|
||||||
|
FileSystem srcFs = path.getFileSystem(conf);
|
||||||
|
srcFs.access(path, FsAction.READ_WRITE);
|
||||||
|
try (HFile.Reader reader =
|
||||||
|
HFile.createReader(srcFs, path, cacheConf, isPrimaryReplicaStore(), conf)) {
|
||||||
|
Optional<byte[]> firstKey = reader.getFirstRowKey();
|
||||||
|
Preconditions.checkState(firstKey.isPresent(), "First key can not be null");
|
||||||
|
Optional<Cell> lk = reader.getLastKey();
|
||||||
|
Preconditions.checkState(lk.isPresent(), "Last key can not be null");
|
||||||
|
byte[] lastKey = CellUtil.cloneRow(lk.get());
|
||||||
|
if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) {
|
||||||
|
throw new WrongRegionException("Recovered hfile " + path.toString() +
|
||||||
|
" does not fit inside region " + this.getRegionInfo().getRegionNameAsString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path);
|
||||||
|
HStoreFile sf = createStoreFileAndReader(dstPath);
|
||||||
|
StoreFileReader r = sf.getReader();
|
||||||
|
this.storeSize.addAndGet(r.length());
|
||||||
|
this.totalUncompressedBytes.addAndGet(r.getTotalUncompressedBytes());
|
||||||
|
|
||||||
|
this.lock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
|
||||||
|
} finally {
|
||||||
|
this.lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Loaded recovered hfile to {}, entries={}, sequenceid={}, filesize={}", sf,
|
||||||
|
r.getEntries(), r.getSequenceID(), TraditionalBinaryPrefix.long2String(r.length(), "B", 1));
|
||||||
|
return sf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param path The pathname of the tmp file into which the store was flushed
|
* @param path The pathname of the tmp file into which the store was flushed
|
||||||
* @return store file created.
|
* @return store file created.
|
||||||
|
|
|
@ -32,15 +32,17 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.CellSet;
|
import org.apache.hadoop.hbase.regionserver.CellSet;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
|
import org.apache.hadoop.hbase.wal.EntryBuffers.RegionEntryBuffer;
|
||||||
|
@ -85,11 +87,14 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
||||||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
PrivateCellUtil.setSequenceId(cell, seqId);
|
||||||
String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
|
String familyName = Bytes.toString(CellUtil.cloneFamily(cell));
|
||||||
// comparator need to be specified for meta
|
// comparator need to be specified for meta
|
||||||
familyCells.computeIfAbsent(familyName, key -> new CellSet(
|
familyCells
|
||||||
isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparator.getInstance()))
|
.computeIfAbsent(familyName,
|
||||||
.add(cell);
|
key -> new CellSet(
|
||||||
|
isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR))
|
||||||
|
.add(cell);
|
||||||
familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId));
|
familySeqIds.compute(familyName, (k, v) -> v == null ? seqId : Math.max(v, seqId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,6 +110,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
||||||
for (Cell cell : cellsEntry.getValue()) {
|
for (Cell cell : cellsEntry.getValue()) {
|
||||||
writer.append(cell);
|
writer.append(cell);
|
||||||
}
|
}
|
||||||
|
// Append the max seqid to hfile, used when recovery.
|
||||||
|
writer.appendMetadata(familySeqIds.get(familyName), false);
|
||||||
regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
|
regionEditsWrittenMap.compute(Bytes.toString(buffer.encodedRegionName),
|
||||||
(k, v) -> v == null ? buffer.entryBuffer.size() : v + buffer.entryBuffer.size());
|
(k, v) -> v == null ? buffer.entryBuffer.size() : v + buffer.entryBuffer.size());
|
||||||
splits.add(writer.getPath());
|
splits.add(writer.getPath());
|
||||||
|
@ -181,44 +188,32 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
||||||
|
|
||||||
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
|
private StoreFileWriter createRecoveredHFileWriter(TableName tableName, String regionName,
|
||||||
long seqId, String familyName, boolean isMetaTable) throws IOException {
|
long seqId, String familyName, boolean isMetaTable) throws IOException {
|
||||||
Path outputFile = WALSplitUtil
|
Path outputDir = WALSplitUtil.tryCreateRecoveredHFilesDir(walSplitter.rootFS, walSplitter.conf,
|
||||||
.getRegionRecoveredHFilePath(tableName, regionName, familyName, seqId,
|
tableName, regionName, familyName);
|
||||||
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.conf, walSplitter.rootFS);
|
|
||||||
checkPathValid(outputFile);
|
|
||||||
StoreFileWriter.Builder writerBuilder =
|
StoreFileWriter.Builder writerBuilder =
|
||||||
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
|
new StoreFileWriter.Builder(walSplitter.conf, CacheConfig.DISABLED, walSplitter.rootFS)
|
||||||
.withFilePath(outputFile);
|
.withOutputDir(outputDir);
|
||||||
HFileContextBuilder hFileContextBuilder = new HFileContextBuilder();
|
|
||||||
if (isMetaTable) {
|
|
||||||
hFileContextBuilder.withCellComparator(CellComparatorImpl.META_COMPARATOR);
|
|
||||||
} else {
|
|
||||||
configContextForNonMetaWriter(tableName, familyName, hFileContextBuilder, writerBuilder);
|
|
||||||
}
|
|
||||||
return writerBuilder.withFileContext(hFileContextBuilder.build()).build();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void configContextForNonMetaWriter(TableName tableName, String familyName,
|
|
||||||
HFileContextBuilder hFileContextBuilder, StoreFileWriter.Builder writerBuilder)
|
|
||||||
throws IOException {
|
|
||||||
TableDescriptor tableDesc =
|
TableDescriptor tableDesc =
|
||||||
tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t));
|
tableDescCache.computeIfAbsent(tableName, t -> getTableDescriptor(t));
|
||||||
if (tableDesc == null) {
|
if (tableDesc == null) {
|
||||||
throw new IOException("Failed to get table descriptor for table " + tableName);
|
throw new IOException("Failed to get table descriptor for table " + tableName);
|
||||||
}
|
}
|
||||||
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
|
ColumnFamilyDescriptor cfd = tableDesc.getColumnFamily(Bytes.toBytesBinary(familyName));
|
||||||
hFileContextBuilder.withCompression(cfd.getCompressionType()).withBlockSize(cfd.getBlocksize())
|
HFileContext hFileContext = createFileContext(cfd, isMetaTable);
|
||||||
.withCompressTags(cfd.isCompressTags()).withDataBlockEncoding(cfd.getDataBlockEncoding())
|
return writerBuilder.withFileContext(hFileContext).withBloomType(cfd.getBloomFilterType())
|
||||||
.withCellComparator(CellComparatorImpl.COMPARATOR);
|
.build();
|
||||||
writerBuilder.withBloomType(cfd.getBloomFilterType());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkPathValid(Path outputFile) throws IOException {
|
private HFileContext createFileContext(ColumnFamilyDescriptor cfd, boolean isMetaTable)
|
||||||
if (walSplitter.rootFS.exists(outputFile)) {
|
throws IOException {
|
||||||
LOG.warn("this file {} may be left after last failed split ", outputFile);
|
return new HFileContextBuilder().withCompression(cfd.getCompressionType())
|
||||||
if (!walSplitter.rootFS.delete(outputFile, false)) {
|
.withChecksumType(HStore.getChecksumType(walSplitter.conf))
|
||||||
LOG.warn("delete old generated HFile {} failed", outputFile);
|
.withBytesPerCheckSum(HStore.getBytesPerChecksum(walSplitter.conf))
|
||||||
}
|
.withBlockSize(cfd.getBlocksize()).withCompressTags(cfd.isCompressTags())
|
||||||
}
|
.withDataBlockEncoding(cfd.getDataBlockEncoding()).withCellComparator(
|
||||||
|
isMetaTable ? CellComparatorImpl.META_COMPARATOR : CellComparatorImpl.COMPARATOR)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private TableDescriptor getTableDescriptor(TableName tableName) {
|
private TableDescriptor getTableDescriptor(TableName tableName) {
|
||||||
|
|
|
@ -566,42 +566,29 @@ public final class WALSplitUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Path to a file under recovered.hfiles directory of the region's column family: e.g.
|
* Return path to recovered.hfiles directory of the region's column family: e.g.
|
||||||
* /hbase/some_table/2323432434/cf/recovered.hfiles/2332-wal. This method also ensures existence
|
* /hbase/some_table/2323432434/cf/recovered.hfiles/. This method also ensures existence of
|
||||||
* of recovered.hfiles directory under the region's column family, creating it if necessary.
|
* recovered.hfiles directory under the region's column family, creating it if necessary.
|
||||||
*
|
* @param rootFS the root file system
|
||||||
* @param tableName the table name
|
* @param conf configuration
|
||||||
* @param encodedRegionName the encoded region name
|
* @param tableName the table name
|
||||||
* @param familyName the column family name
|
* @param encodedRegionName the encoded region name
|
||||||
* @param seqId the sequence id which used to generate file name
|
* @param familyName the column family name
|
||||||
|
* @param seqId the sequence id which used to generate file name
|
||||||
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
|
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name
|
||||||
* @param conf configuration
|
* @return Path to recovered.hfiles directory of the region's column family.
|
||||||
* @param rootFS the root file system
|
|
||||||
* @return Path to file into which to dump split log edits.
|
|
||||||
*/
|
*/
|
||||||
static Path getRegionRecoveredHFilePath(TableName tableName, String encodedRegionName,
|
static Path tryCreateRecoveredHFilesDir(FileSystem rootFS, Configuration conf,
|
||||||
String familyName, long seqId, String fileNameBeingSplit, Configuration conf, FileSystem rootFS)
|
TableName tableName, String encodedRegionName, String familyName) throws IOException {
|
||||||
throws IOException {
|
|
||||||
Path rootDir = FSUtils.getRootDir(conf);
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
Path regionDir =
|
Path regionDir = FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName),
|
||||||
FSUtils.getRegionDirFromTableDir(FSUtils.getTableDir(rootDir, tableName), encodedRegionName);
|
encodedRegionName);
|
||||||
Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
|
Path dir = getRecoveredHFilesDir(regionDir, familyName);
|
||||||
|
|
||||||
if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
|
if (!rootFS.exists(dir) && !rootFS.mkdirs(dir)) {
|
||||||
LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
|
LOG.warn("mkdir failed on {}, region {}, column family {}", dir, encodedRegionName,
|
||||||
familyName);
|
familyName);
|
||||||
}
|
}
|
||||||
|
return dir;
|
||||||
String fileName = formatRecoveredHFileName(seqId, fileNameBeingSplit);
|
|
||||||
return new Path(dir, fileName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String formatRecoveredHFileName(long seqId, String fileNameBeingSplit) {
|
|
||||||
return String.format("%019d", seqId) + "-" + fileNameBeingSplit;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static long getSeqIdForRecoveredHFile(String fileName) {
|
|
||||||
return Long.parseLong(fileName.split("-")[0]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -609,13 +596,13 @@ public final class WALSplitUtil {
|
||||||
* @param familyName The column family name
|
* @param familyName The column family name
|
||||||
* @return The directory that holds recovered hfiles for the region's column family
|
* @return The directory that holds recovered hfiles for the region's column family
|
||||||
*/
|
*/
|
||||||
private static Path getStoreDirRecoveredHFilesDir(final Path regionDir, String familyName) {
|
private static Path getRecoveredHFilesDir(final Path regionDir, String familyName) {
|
||||||
return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
|
return new Path(new Path(regionDir, familyName), HConstants.RECOVERED_HFILES_DIR);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
|
public static FileStatus[] getRecoveredHFiles(final FileSystem rootFS,
|
||||||
final Path regionDir, String familyName) throws IOException {
|
final Path regionDir, String familyName) throws IOException {
|
||||||
Path dir = getStoreDirRecoveredHFilesDir(regionDir, familyName);
|
Path dir = getRecoveredHFilesDir(regionDir, familyName);
|
||||||
return FSUtils.listStatus(rootFS, dir);
|
return FSUtils.listStatus(rootFS, dir);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -30,7 +30,9 @@ import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
@ -252,6 +254,92 @@ public class TestWALSplitToHFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPutWithSameTimestamp() throws Exception {
|
||||||
|
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
|
||||||
|
TableDescriptor td = pair.getFirst();
|
||||||
|
RegionInfo ri = pair.getSecond();
|
||||||
|
|
||||||
|
WAL wal = createWAL(this.conf, rootDir, logName);
|
||||||
|
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
||||||
|
final long timestamp = this.ee.currentTime();
|
||||||
|
// Write data and flush
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
|
||||||
|
}
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
// Now assert edits made it in.
|
||||||
|
Result result1 = region.get(new Get(ROW));
|
||||||
|
assertEquals(td.getColumnFamilies().length, result1.size());
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write data with same timestamp and do not flush
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE2));
|
||||||
|
}
|
||||||
|
// Now close the region (without flush)
|
||||||
|
region.close(true);
|
||||||
|
wal.shutdown();
|
||||||
|
// split the log
|
||||||
|
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
|
||||||
|
|
||||||
|
// reopen the region
|
||||||
|
WAL wal2 = createWAL(this.conf, rootDir, logName);
|
||||||
|
HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
|
||||||
|
Result result2 = region2.get(new Get(ROW));
|
||||||
|
assertEquals(td.getColumnFamilies().length, result2.size());
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
assertTrue(Bytes.equals(VALUE2, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRecoverSequenceId() throws Exception {
|
||||||
|
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
|
||||||
|
TableDescriptor td = pair.getFirst();
|
||||||
|
RegionInfo ri = pair.getSecond();
|
||||||
|
|
||||||
|
WAL wal = createWAL(this.conf, rootDir, logName);
|
||||||
|
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
||||||
|
Map<Integer, Map<String, Long>> seqIdMap = new HashMap<>();
|
||||||
|
// Write data and do not flush
|
||||||
|
for (int i = 0; i < countPerFamily; i++) {
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
region.put(new Put(Bytes.toBytes(i)).addColumn(cfd.getName(), Bytes.toBytes("x"), VALUE1));
|
||||||
|
Result result = region.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
|
||||||
|
assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x"))));
|
||||||
|
List<Cell> cells = result.listCells();
|
||||||
|
assertEquals(1, cells.size());
|
||||||
|
seqIdMap.computeIfAbsent(i, r -> new HashMap<>()).put(cfd.getNameAsString(),
|
||||||
|
cells.get(0).getSequenceId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now close the region (without flush)
|
||||||
|
region.close(true);
|
||||||
|
wal.shutdown();
|
||||||
|
// split the log
|
||||||
|
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
|
||||||
|
|
||||||
|
// reopen the region
|
||||||
|
WAL wal2 = createWAL(this.conf, rootDir, logName);
|
||||||
|
HRegion region2 = HRegion.openHRegion(conf, this.fs, rootDir, ri, td, wal2);
|
||||||
|
// assert the seqid was recovered
|
||||||
|
for (int i = 0; i < countPerFamily; i++) {
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
Result result = region2.get(new Get(Bytes.toBytes(i)).addFamily(cfd.getName()));
|
||||||
|
assertTrue(Bytes.equals(VALUE1, result.getValue(cfd.getName(), Bytes.toBytes("x"))));
|
||||||
|
List<Cell> cells = result.listCells();
|
||||||
|
assertEquals(1, cells.size());
|
||||||
|
assertEquals((long) seqIdMap.get(i).get(cfd.getNameAsString()),
|
||||||
|
cells.get(0).getSequenceId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test writing edits into an HRegion, closing it, splitting logs, opening
|
* Test writing edits into an HRegion, closing it, splitting logs, opening
|
||||||
* Region again. Verify seqids.
|
* Region again. Verify seqids.
|
||||||
|
|
Loading…
Reference in New Issue