HBASE-24033 Add ut for loading the corrupt recovered hfiles (#1322)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
d4ab645531
commit
2a60a61a73
|
@ -5449,6 +5449,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
store.assertBulkLoadHFileOk(filePath);
|
store.assertBulkLoadHFileOk(filePath);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
handleException(fs.getFileSystem(), filePath, e);
|
handleException(fs.getFileSystem(), filePath, e);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
Pair<Path, Path> pair = store.preBulkLoadHFile(filePath.toString(), -1);
|
Pair<Path, Path> pair = store.preBulkLoadHFile(filePath.toString(), -1);
|
||||||
store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
|
store.bulkLoadHFile(Bytes.toBytes(familyName), pair.getFirst().toString(),
|
||||||
|
|
|
@ -345,16 +345,16 @@ public final class WALSplitUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move aside a bad edits file.
|
* Move aside a bad edits file.
|
||||||
* @param walFS WAL FileSystem used to rename bad edits file.
|
* @param fs the file system used to rename bad edits file.
|
||||||
* @param edits Edits file to move aside.
|
* @param edits Edits file to move aside.
|
||||||
* @return The name of the moved aside file.
|
* @return The name of the moved aside file.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits)
|
public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path moveAsideName =
|
Path moveAsideName =
|
||||||
new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
|
new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis());
|
||||||
if (!walFS.rename(edits, moveAsideName)) {
|
if (!fs.rename(edits, moveAsideName)) {
|
||||||
LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
|
LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
|
||||||
}
|
}
|
||||||
return moveAsideName;
|
return moveAsideName;
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.wal;
|
||||||
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
|
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
|
||||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
|
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -32,6 +33,9 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -51,6 +55,7 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
|
||||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||||
|
@ -61,10 +66,12 @@ import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
import org.apache.hadoop.hbase.util.FSTableDescriptors;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -95,6 +102,11 @@ public class TestWALSplitToHFile {
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private WALFactory wals;
|
private WALFactory wals;
|
||||||
|
|
||||||
|
private static final byte[] ROW = Bytes.toBytes("row");
|
||||||
|
private static final byte[] VALUE1 = Bytes.toBytes("value1");
|
||||||
|
private static final byte[] VALUE2 = Bytes.toBytes("value2");
|
||||||
|
private static final int countPerFamily = 10;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final TestName TEST_NAME = new TestName();
|
public final TestName TEST_NAME = new TestName();
|
||||||
|
|
||||||
|
@ -116,6 +128,7 @@ public class TestWALSplitToHFile {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
|
this.conf = HBaseConfiguration.create(UTIL.getConfiguration());
|
||||||
|
this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, false);
|
||||||
this.fs = UTIL.getDFSCluster().getFileSystem();
|
this.fs = UTIL.getDFSCluster().getFileSystem();
|
||||||
this.rootDir = FSUtils.getRootDir(this.conf);
|
this.rootDir = FSUtils.getRootDir(this.conf);
|
||||||
this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
this.oldLogDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
@ -163,24 +176,93 @@ public class TestWALSplitToHFile {
|
||||||
return wal;
|
return wal;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private Pair<TableDescriptor, RegionInfo> setupTableAndRegion() throws IOException {
|
||||||
* Test writing edits into an HRegion, closing it, splitting logs, opening
|
|
||||||
* Region again. Verify seqids.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testReplayEditsWrittenViaHRegion()
|
|
||||||
throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
|
|
||||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||||
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
||||||
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
||||||
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
|
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
|
||||||
deleteDir(tableDir);
|
deleteDir(tableDir);
|
||||||
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
|
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
|
||||||
final byte[] rowName = tableName.getName();
|
HRegion region = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
||||||
final int countPerFamily = 10;
|
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||||
|
return new Pair<>(td, ri);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorruptRecoveredHFile() throws Exception {
|
||||||
|
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
|
||||||
|
TableDescriptor td = pair.getFirst();
|
||||||
|
RegionInfo ri = pair.getSecond();
|
||||||
|
|
||||||
|
WAL wal = createWAL(this.conf, rootDir, logName);
|
||||||
|
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
||||||
|
final long timestamp = this.ee.currentTime();
|
||||||
|
// Write data and flush
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
region.put(new Put(ROW).addColumn(cfd.getName(), Bytes.toBytes("x"), timestamp, VALUE1));
|
||||||
|
}
|
||||||
|
region.flush(true);
|
||||||
|
|
||||||
|
// Now assert edits made it in.
|
||||||
|
Result result1 = region.get(new Get(ROW));
|
||||||
|
assertEquals(td.getColumnFamilies().length, result1.size());
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
assertTrue(Bytes.equals(VALUE1, result1.getValue(cfd.getName(), Bytes.toBytes("x"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now close the region
|
||||||
|
region.close(true);
|
||||||
|
wal.shutdown();
|
||||||
|
// split the log
|
||||||
|
WALSplitter.split(rootDir, logDir, oldLogDir, FileSystem.get(this.conf), this.conf, wals);
|
||||||
|
|
||||||
|
// Write a corrupt recovered hfile
|
||||||
|
Path regionDir =
|
||||||
|
new Path(CommonFSUtils.getTableDir(rootDir, td.getTableName()), ri.getEncodedName());
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
FileStatus[] files =
|
||||||
|
WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
|
||||||
|
assertNotNull(files);
|
||||||
|
assertTrue(files.length > 0);
|
||||||
|
writeCorruptRecoveredHFile(files[0].getPath());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Failed to reopen the region
|
||||||
|
WAL wal2 = createWAL(this.conf, rootDir, logName);
|
||||||
|
try {
|
||||||
|
HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
|
||||||
|
fail("Should fail to open region");
|
||||||
|
} catch (CorruptHFileException che) {
|
||||||
|
// Expected
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set skip errors to true and reopen the region
|
||||||
|
this.conf.setBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, true);
|
||||||
|
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal2);
|
||||||
|
Result result2 = region2.get(new Get(ROW));
|
||||||
|
assertEquals(td.getColumnFamilies().length, result2.size());
|
||||||
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
|
assertTrue(Bytes.equals(VALUE1, result2.getValue(cfd.getName(), Bytes.toBytes("x"))));
|
||||||
|
// Assert the corrupt file was skipped and still exist
|
||||||
|
FileStatus[] files =
|
||||||
|
WALSplitUtil.getRecoveredHFiles(this.fs, regionDir, cfd.getNameAsString());
|
||||||
|
assertNotNull(files);
|
||||||
|
assertEquals(1, files.length);
|
||||||
|
assertTrue(files[0].getPath().getName().contains("corrupt"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test writing edits into an HRegion, closing it, splitting logs, opening
|
||||||
|
* Region again. Verify seqids.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWrittenViaHRegion()
|
||||||
|
throws IOException, SecurityException, IllegalArgumentException, InterruptedException {
|
||||||
|
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
|
||||||
|
TableDescriptor td = pair.getFirst();
|
||||||
|
RegionInfo ri = pair.getSecond();
|
||||||
|
|
||||||
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(region3);
|
|
||||||
// Write countPerFamily edits into the three families. Do a flush on one
|
// Write countPerFamily edits into the three families. Do a flush on one
|
||||||
// of the families during the load of edits so its seqid is not same as
|
// of the families during the load of edits so its seqid is not same as
|
||||||
// others to test we do right thing when different seqids.
|
// others to test we do right thing when different seqids.
|
||||||
|
@ -189,7 +271,7 @@ public class TestWALSplitToHFile {
|
||||||
long seqid = region.getOpenSeqNum();
|
long seqid = region.getOpenSeqNum();
|
||||||
boolean first = true;
|
boolean first = true;
|
||||||
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
|
addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
|
||||||
if (first) {
|
if (first) {
|
||||||
// If first, so we have at least one family w/ different seqid to rest.
|
// If first, so we have at least one family w/ different seqid to rest.
|
||||||
region.flush(true);
|
region.flush(true);
|
||||||
|
@ -197,7 +279,7 @@ public class TestWALSplitToHFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Now assert edits made it in.
|
// Now assert edits made it in.
|
||||||
final Get g = new Get(rowName);
|
final Get g = new Get(ROW);
|
||||||
Result result = region.get(g);
|
Result result = region.get(g);
|
||||||
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
|
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
|
||||||
// Now close the region (without flush), split the log, reopen the region and assert that
|
// Now close the region (without flush), split the log, reopen the region and assert that
|
||||||
|
@ -222,14 +304,14 @@ public class TestWALSplitToHFile {
|
||||||
// out from under it and assert that replay of the log adds the edits back
|
// out from under it and assert that replay of the log adds the edits back
|
||||||
// correctly when region is opened again.
|
// correctly when region is opened again.
|
||||||
for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
|
for (ColumnFamilyDescriptor hcd : td.getColumnFamilies()) {
|
||||||
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region2, "y");
|
addRegionEdits(ROW, hcd.getName(), countPerFamily, this.ee, region2, "y");
|
||||||
}
|
}
|
||||||
// Get count of edits.
|
// Get count of edits.
|
||||||
final Result result2 = region2.get(g);
|
final Result result2 = region2.get(g);
|
||||||
assertEquals(2 * result.size(), result2.size());
|
assertEquals(2 * result.size(), result2.size());
|
||||||
wal2.sync();
|
wal2.sync();
|
||||||
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
final Configuration newConf = HBaseConfiguration.create(this.conf);
|
||||||
User user = HBaseTestingUtility.getDifferentUser(newConf, tableName.getNameAsString());
|
User user = HBaseTestingUtility.getDifferentUser(newConf, td.getTableName().getNameAsString());
|
||||||
user.runAs(new PrivilegedExceptionAction<Object>() {
|
user.runAs(new PrivilegedExceptionAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object run() throws Exception {
|
public Object run() throws Exception {
|
||||||
|
@ -237,6 +319,7 @@ public class TestWALSplitToHFile {
|
||||||
FileSystem newFS = FileSystem.get(newConf);
|
FileSystem newFS = FileSystem.get(newConf);
|
||||||
// Make a new wal for new region open.
|
// Make a new wal for new region open.
|
||||||
WAL wal3 = createWAL(newConf, rootDir, logName);
|
WAL wal3 = createWAL(newConf, rootDir, logName);
|
||||||
|
Path tableDir = FSUtils.getTableDir(rootDir, td.getTableName());
|
||||||
HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
|
HRegion region3 = new HRegion(tableDir, wal3, newFS, newConf, ri, td, null);
|
||||||
long seqid3 = region3.initialize();
|
long seqid3 = region3.initialize();
|
||||||
Result result3 = region3.get(g);
|
Result result3 = region3.get(g);
|
||||||
|
@ -262,18 +345,12 @@ public class TestWALSplitToHFile {
|
||||||
* We restart Region again, and verify that the edits were replayed.
|
* We restart Region again, and verify that the edits were replayed.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testReplayEditsAfterPartialFlush()
|
public void testAfterPartialFlush()
|
||||||
throws IOException, SecurityException, IllegalArgumentException {
|
throws IOException, SecurityException, IllegalArgumentException {
|
||||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
|
||||||
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
TableDescriptor td = pair.getFirst();
|
||||||
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
|
RegionInfo ri = pair.getSecond();
|
||||||
deleteDir(tableDir);
|
|
||||||
final byte[] rowName = tableName.getName();
|
|
||||||
final int countPerFamily = 10;
|
|
||||||
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
|
||||||
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
|
|
||||||
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(region3);
|
|
||||||
// Write countPerFamily edits into the three families. Do a flush on one
|
// Write countPerFamily edits into the three families. Do a flush on one
|
||||||
// of the families during the load of edits so its seqid is not same as
|
// of the families during the load of edits so its seqid is not same as
|
||||||
// others to test we do right thing when different seqids.
|
// others to test we do right thing when different seqids.
|
||||||
|
@ -281,11 +358,11 @@ public class TestWALSplitToHFile {
|
||||||
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
HRegion region = HRegion.openHRegion(this.conf, this.fs, rootDir, ri, td, wal);
|
||||||
long seqid = region.getOpenSeqNum();
|
long seqid = region.getOpenSeqNum();
|
||||||
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
for (ColumnFamilyDescriptor cfd : td.getColumnFamilies()) {
|
||||||
addRegionEdits(rowName, cfd.getName(), countPerFamily, this.ee, region, "x");
|
addRegionEdits(ROW, cfd.getName(), countPerFamily, this.ee, region, "x");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now assert edits made it in.
|
// Now assert edits made it in.
|
||||||
final Get g = new Get(rowName);
|
final Get g = new Get(ROW);
|
||||||
Result result = region.get(g);
|
Result result = region.get(g);
|
||||||
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
|
assertEquals(countPerFamily * td.getColumnFamilies().length, result.size());
|
||||||
|
|
||||||
|
@ -323,15 +400,11 @@ public class TestWALSplitToHFile {
|
||||||
* and flush again, at last verify the data.
|
* and flush again, at last verify the data.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testReplayEditsAfterAbortingFlush() throws IOException {
|
public void testAfterAbortingFlush() throws IOException {
|
||||||
final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
Pair<TableDescriptor, RegionInfo> pair = setupTableAndRegion();
|
||||||
final RegionInfo ri = RegionInfoBuilder.newBuilder(tableName).build();
|
TableDescriptor td = pair.getFirst();
|
||||||
final Path tableDir = FSUtils.getTableDir(this.rootDir, tableName);
|
RegionInfo ri = pair.getSecond();
|
||||||
deleteDir(tableDir);
|
|
||||||
final TableDescriptor td = createBasic3FamilyTD(tableName);
|
|
||||||
FSTableDescriptors.createTableDescriptorForTableDirectory(fs, tableDir, td, false);
|
|
||||||
HRegion region3 = HBaseTestingUtility.createRegionAndWAL(ri, rootDir, this.conf, td);
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(region3);
|
|
||||||
// Write countPerFamily edits into the three families. Do a flush on one
|
// Write countPerFamily edits into the three families. Do a flush on one
|
||||||
// of the families during the load of edits so its seqid is not same as
|
// of the families during the load of edits so its seqid is not same as
|
||||||
// others to test we do right thing when different seqids.
|
// others to test we do right thing when different seqids.
|
||||||
|
@ -347,7 +420,7 @@ public class TestWALSplitToHFile {
|
||||||
int writtenRowCount = 10;
|
int writtenRowCount = 10;
|
||||||
List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
|
List<ColumnFamilyDescriptor> families = Arrays.asList(td.getColumnFamilies());
|
||||||
for (int i = 0; i < writtenRowCount; i++) {
|
for (int i = 0; i < writtenRowCount; i++) {
|
||||||
Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
|
Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
|
||||||
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
||||||
Bytes.toBytes("val"));
|
Bytes.toBytes("val"));
|
||||||
region.put(put);
|
region.put(put);
|
||||||
|
@ -372,7 +445,7 @@ public class TestWALSplitToHFile {
|
||||||
// writing more data
|
// writing more data
|
||||||
int moreRow = 10;
|
int moreRow = 10;
|
||||||
for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
|
for (int i = writtenRowCount; i < writtenRowCount + moreRow; i++) {
|
||||||
Put put = new Put(Bytes.toBytes(tableName + Integer.toString(i)));
|
Put put = new Put(Bytes.toBytes(td.getTableName() + Integer.toString(i)));
|
||||||
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
put.addColumn(families.get(i % families.size()).getName(), Bytes.toBytes("q"),
|
||||||
Bytes.toBytes("val"));
|
Bytes.toBytes("val"));
|
||||||
region.put(put);
|
region.put(put);
|
||||||
|
@ -414,4 +487,21 @@ public class TestWALSplitToHFile {
|
||||||
}
|
}
|
||||||
return scannedCount;
|
return scannedCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeCorruptRecoveredHFile(Path recoveredHFile) throws Exception {
|
||||||
|
// Read the recovered hfile
|
||||||
|
int fileSize = (int) fs.listStatus(recoveredHFile)[0].getLen();
|
||||||
|
FSDataInputStream in = fs.open(recoveredHFile);
|
||||||
|
byte[] fileContent = new byte[fileSize];
|
||||||
|
in.readFully(0, fileContent, 0, fileSize);
|
||||||
|
in.close();
|
||||||
|
|
||||||
|
// Write a corrupt hfile by append garbage
|
||||||
|
Path path = new Path(recoveredHFile.getParent(), recoveredHFile.getName() + ".corrupt");
|
||||||
|
FSDataOutputStream out;
|
||||||
|
out = fs.create(path);
|
||||||
|
out.write(fileContent);
|
||||||
|
out.write(Bytes.toBytes("-----"));
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue