HBASE-16841 Data loss in MOB files after cloning a snapshot and deleting that snapshot

This commit is contained in:
Jingcheng Du 2016-12-05 16:22:10 +08:00
parent e10baacd3e
commit 26c2d93f77
9 changed files with 147 additions and 50 deletions

View File

@ -169,6 +169,21 @@ public class HFileArchiver {
public static void archiveFamily(FileSystem fs, Configuration conf,
HRegionInfo parent, Path tableDir, byte[] family) throws IOException {
Path familyDir = new Path(tableDir, new Path(parent.getEncodedName(), Bytes.toString(family)));
archiveFamilyByFamilyDir(fs, conf, parent, familyDir, family);
}
/**
* Removes from the specified region the store files of the specified column family,
* either by archiving them or outright deletion
* @param fs the filesystem where the store files live
* @param conf {@link Configuration} to examine to determine the archive directory
* @param parent Parent region hosting the store files
* @param familyDir {@link Path} to where the family is being stored
* @param family the family hosting the store files
* @throws IOException if the files could not be correctly disposed.
*/
public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf,
HRegionInfo parent, Path familyDir, byte[] family) throws IOException {
FileStatus[] storeFiles = FSUtils.listStatus(fs, familyDir);
if (storeFiles == null) {
LOG.debug("No store files to dispose for region=" + parent.getRegionNameAsString() +
@ -178,7 +193,7 @@ public class HFileArchiver {
FileStatusConverter getAsFile = new FileStatusConverter(fs);
Collection<File> toArchive = Lists.transform(Arrays.asList(storeFiles), getAsFile);
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, tableDir, family);
Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family);
// do the actual archive
List<File> failedArchive = resolveAndArchive(fs, storeArchiveDir, toArchive,

View File

@ -25,7 +25,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HRegionInfo;
@ -37,7 +36,6 @@ import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Pair;
@ -83,11 +81,13 @@ public class DisabledTableSnapshotHandler extends TakeSnapshotHandler {
if (RegionReplicaUtil.isDefaultReplica(hri)) {
regions.add(hri);
}
// if it's the first region, add the mob region
if (Bytes.equals(hri.getStartKey(), HConstants.EMPTY_START_ROW)) {
HRegionInfo mobRegion = MobUtils.getMobRegionInfo(hri.getTable());
regions.add(mobRegion);
}
}
// handle the mob files if any.
boolean mobEnabled = MobUtils.hasMobColumns(htd);
if (mobEnabled) {
// snapshot the mob files as a offline region.
HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
regions.add(mobRegionInfo);
}
// 2. for each region, write all the info to disk

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.SnapshotDescription;
@ -103,6 +104,14 @@ public class EnabledTableSnapshotHandler extends TakeSnapshotHandler {
snapshotDisabledRegion(regionInfo);
}
}
// handle the mob files if any.
boolean mobEnabled = MobUtils.hasMobColumns(htd);
if (mobEnabled) {
LOG.info("Taking snapshot for mob files in table " + htd.getTableName());
// snapshot the mob files as a offline region.
HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
snapshotMobRegion(mobRegionInfo);
}
} catch (InterruptedException e) {
ForeignException ee =
new ForeignException("Interrupted while waiting for snapshot to finish", e);
@ -112,4 +121,14 @@ public class EnabledTableSnapshotHandler extends TakeSnapshotHandler {
monitor.receive(e);
}
}
/**
* Takes a snapshot of the mob region
*/
private void snapshotMobRegion(final HRegionInfo regionInfo)
throws IOException {
snapshotManifest.addMobRegion(regionInfo);
monitor.rethrowException();
status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable);
}
}

View File

@ -136,7 +136,6 @@ import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
@ -3749,24 +3748,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
SnapshotManifest manifest = SnapshotManifest.create(conf, getFilesystem(),
snapshotDir, desc, exnSnare);
manifest.addRegion(this);
// The regionserver holding the first region of the table is responsible for taking the
// manifest of the mob dir.
if (!Bytes.equals(getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW))
return;
// if any cf's have is mob enabled, add the "mob region" to the manifest.
List<Store> stores = getStores();
for (Store store : stores) {
boolean hasMobStore = store.getFamily().isMobEnabled();
if (hasMobStore) {
// use the .mob as the start key and 0 as the regionid
HRegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(this.getTableDesc().getTableName());
mobRegionInfo.setOffline(true);
manifest.addMobRegion(mobRegionInfo, this.getTableDesc().getColumnFamilies());
return;
}
}
}
private void updateSequenceId(final Iterable<List<Cell>> cellItr, final long sequenceId)

View File

@ -519,7 +519,7 @@ public class RestoreSnapshotHelper {
// Family doesn't exists in the snapshot
LOG.trace("Removing family=" + Bytes.toString(family) +
" from region=" + regionInfo.getEncodedName() + " table=" + tableName);
HFileArchiver.archiveFamily(fs, conf, regionInfo, tableDir, family);
HFileArchiver.archiveFamilyByFamilyDir(fs, conf, regionInfo, familyDir, family);
fs.delete(familyDir, true);
}
}

View File

@ -162,7 +162,7 @@ public final class SnapshotManifest {
}
}
public void addMobRegion(HRegionInfo regionInfo, HColumnDescriptor[] hcds) throws IOException {
public void addMobRegion(HRegionInfo regionInfo) throws IOException {
// 0. Get the ManifestBuilder/RegionVisitor
RegionVisitor visitor = createRegionVisitor(desc);
@ -175,7 +175,7 @@ public final class SnapshotManifest {
LOG.debug("Creating references for mob files");
Path mobRegionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
for (HColumnDescriptor hcd : hcds) {
for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
// 2.1. build the snapshot reference for the store if it's a mob store
if (!hcd.isMobEnabled()) {
continue;
@ -249,9 +249,13 @@ public final class SnapshotManifest {
boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
try {
Path baseDir = tableDir;
// Open the RegionFS
if (isMobRegion) {
baseDir = FSUtils.getTableDir(MobUtils.getMobHome(conf), regionInfo.getTable());
}
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(conf, fs,
tableDir, regionInfo, true);
baseDir, regionInfo, true);
monitor.rethrowException();
// 1. dump region meta info into the snapshot directory
@ -273,15 +277,7 @@ public final class SnapshotManifest {
Object familyData = visitor.familyOpen(regionData, Bytes.toBytes(familyName));
monitor.rethrowException();
Collection<StoreFileInfo> storeFiles = null;
if (isMobRegion) {
Path regionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
Path storePath = MobUtils.getMobFamilyPath(regionPath, familyName);
storeFiles = getStoreFiles(storePath);
} else {
storeFiles = regionFs.getStoreFiles(familyName);
}
Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName);
if (storeFiles == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No files under family: " + familyName);

View File

@ -64,6 +64,20 @@ public class HFileArchiveUtil {
HRegionInfo region,
Path tabledir,
byte[] family) throws IOException {
return getStoreArchivePath(conf, region, family);
}
/**
* Gets the directory to archive a store directory.
* @param conf {@link Configuration} to read for the archive directory name.
* @param region parent region information under which the store currently lives
* @param family name of the family in the store
* @return {@link Path} to the directory to archive the given store or <tt>null</tt> if it should
* not be archived
*/
public static Path getStoreArchivePath(Configuration conf,
HRegionInfo region,
byte[] family) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path tableArchiveDir = getTableArchivePath(rootDir, region.getTable());
return HStore.getStoreHomedir(tableArchiveDir, region, family);

View File

@ -17,18 +17,28 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
@ -36,10 +46,12 @@ import org.junit.experimental.categories.Category;
*/
@Category({LargeTests.class, ClientTests.class})
public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient {
private static final Log LOG = LogFactory.getLog(TestMobCloneSnapshotFromClient.class);
private static boolean delayFlush = false;
protected static void setupConfiguration() {
TestCloneSnapshotFromClient.setupConfiguration();
TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0);
TEST_UTIL.getConfiguration().setInt(MobConstants.MOB_FILE_CACHE_SIZE_KEY, 0);
}
@ -52,7 +64,9 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
@Override
protected void createTableAndSnapshots() throws Exception {
// create Table and disable it
MobSnapshotTestingUtils.createMobTable(TEST_UTIL, tableName, getNumReplicas(), FAMILY);
createMobTable(TEST_UTIL, tableName, SnapshotTestingUtils.getSplitKeys(), getNumReplicas(),
FAMILY);
delayFlush = false;
admin.disableTable(tableName);
// take an empty snapshot
@ -63,7 +77,7 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
try {
// enable table and insert data
admin.enableTable(tableName);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 20, FAMILY);
snapshot0Rows = MobSnapshotTestingUtils.countMobRows(table);
admin.disableTable(tableName);
@ -72,7 +86,7 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
// enable table and insert more data
admin.enableTable(tableName);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY);
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 20, FAMILY);
snapshot1Rows = MobSnapshotTestingUtils.countMobRows(table);
admin.disableTable(tableName);
@ -86,9 +100,70 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
}
}
@Test
@Override
public void testCloneLinksAfterDelete() throws IOException, InterruptedException {
// delay the flush to make sure
delayFlush = true;
SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 20, FAMILY);
long tid = System.currentTimeMillis();
byte[] snapshotName3 = Bytes.toBytes("snaptb3-" + tid);
TableName clonedTableName3 = TableName.valueOf("clonedtb3-" + System.currentTimeMillis());
admin.snapshot(snapshotName3, tableName);
delayFlush = false;
int snapshot3Rows = -1;
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
snapshot3Rows = TEST_UTIL.countRows(table);
}
admin.cloneSnapshot(snapshotName3, clonedTableName3);
admin.deleteSnapshot(snapshotName3);
super.testCloneLinksAfterDelete();
verifyRowCount(TEST_UTIL, clonedTableName3, snapshot3Rows);
admin.disableTable(clonedTableName3);
admin.deleteTable(clonedTableName3);
}
@Override
protected void verifyRowCount(final HBaseTestingUtility util, final TableName tableName,
long expectedRows) throws IOException {
MobSnapshotTestingUtils.verifyMobRowCount(util, tableName, expectedRows);
}
/**
* This coprocessor is used to delay the flush.
*/
public static class DelayFlushCoprocessor extends BaseRegionObserver {
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
if (delayFlush) {
try {
if (Bytes.compareTo(e.getEnvironment().getRegionInfo().getStartKey(),
HConstants.EMPTY_START_ROW) != 0) {
Thread.sleep(100);
}
} catch (InterruptedException e1) {
throw new InterruptedIOException(e1.getMessage());
}
}
super.preFlush(e);
}
}
private void createMobTable(final HBaseTestingUtility util, final TableName tableName,
final byte[][] splitKeys, int regionReplication, final byte[]... families) throws IOException,
InterruptedException {
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.setRegionReplication(regionReplication);
htd.addCoprocessor(DelayFlushCoprocessor.class.getName());
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family);
hcd.setMobEnabled(true);
hcd.setMobThreshold(0L);
htd.addFamily(hcd);
}
util.getHBaseAdmin().createTable(htd, splitKeys);
SnapshotTestingUtils.waitForTableToBeOnline(util, tableName);
assertEquals((splitKeys.length + 1) * regionReplication,
util.getHBaseAdmin().getTableRegions(tableName).size());
}
}

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -37,7 +35,6 @@ import org.junit.experimental.categories.Category;
*/
@Category({ClientTests.class, LargeTests.class})
public class TestMobRestoreSnapshotFromClient extends TestRestoreSnapshotFromClient {
private static final Log LOG = LogFactory.getLog(TestMobRestoreSnapshotFromClient.class);
@BeforeClass
public static void setupCluster() throws Exception {