HBASE-18806 VerifyRep by snapshot need not to restore snapshot for each mapper

This commit is contained in:
huzheng 2017-09-14 17:08:16 +08:00
parent 9a98bb4ce9
commit 6e136f26bf
6 changed files with 185 additions and 55 deletions

View File

@ -345,22 +345,20 @@ public class TableMapReduceUtil {
}
/**
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
* and read directly from snapshot files.
*
* Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
* from snapshot files.
* @param snapshotName The name of the snapshot (of a table) to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured
* job classes via the distributed cache (tmpjars).
*
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase
* configuration.
* @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
* the distributed cache (tmpjars).
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restore directory can be deleted.
* have write permissions to this directory, and this should not be a subdirectory of
* rootdir. After the job is finished, restore directory can be deleted.
* @throws IOException When setting up the details fails.
* @see TableSnapshotInputFormat
*/
@ -369,10 +367,10 @@ public class TableMapReduceUtil {
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job,
boolean addDependencyJars, Path tmpRestoreDir)
throws IOException {
throws IOException {
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
addDependencyJars, false, TableSnapshotInputFormat.class);
resetCacheConfig(job.getConfiguration());
}

View File

@ -491,9 +491,9 @@ public class TableSnapshotInputFormatImpl {
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param conf the job to configuration
* @param snapshotName the name of the snapshot to read from
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @param restoreDir a temporary directory to restore the snapshot into. Current user should have
* write permissions to this directory, and this should not be a subdirectory of rootdir.
* After the job is finished, restoreDir can be deleted.
* @throws IOException if an error occurs
*/
public static void setInput(Configuration conf, String snapshotName, Path restoreDir)

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
@ -211,8 +213,8 @@ public class VerifyReplication extends Configured implements Tool {
+ peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
+ " peerFSAddress:" + peerFSAddress);
replicatedScanner = new TableSnapshotScanner(peerConf,
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf),
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
} else {
replicatedScanner = replicatedTable.getScanner(scan);
}
@ -360,6 +362,17 @@ public class VerifyReplication extends Configured implements Tool {
}
}
private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
throws IOException {
Configuration peerConf =
HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
FileSystem.setDefaultUri(peerConf, peerFSAddress);
FSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
FileSystem fs = FileSystem.get(peerConf);
RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, FSUtils.getRootDir(peerConf),
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
}
/**
* Sets up the actual job.
*
@ -404,7 +417,13 @@ public class VerifyReplication extends Configured implements Tool {
//Set Snapshot specific parameters
if (peerSnapshotName != null) {
conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
// for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
// restore snapshot.
Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
peerSnapshotTmpDir = restoreDir.toString();
conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
conf.set(NAME + ".peerFSAddress", peerFSAddress);
conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
@ -441,6 +460,7 @@ public class VerifyReplication extends Configured implements Tool {
"Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
null, job, true, snapshotTempPath);
restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
} else {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -31,6 +32,7 @@ import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -907,6 +909,17 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
}
private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
throws IOException {
FileSystem fs = FileSystem.get(conf);
FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
assertNotNull(subDirectories);
assertEquals(subDirectories.length, expectedCount);
for (int i = 0; i < expectedCount; i++) {
assertTrue(subDirectories[i].isDirectory());
}
}
@Test(timeout = 300000)
public void testVerifyReplicationWithSnapshotSupport() throws Exception {
// Populate the tables, at the same time it guarantees that the tables are
@ -948,6 +961,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(0,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
checkRestoreTmpDir(conf1, temPath1, 1);
checkRestoreTmpDir(conf2, temPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
Put put = null;
@ -985,6 +1001,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
assertEquals(NB_ROWS_IN_BATCH,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
checkRestoreTmpDir(conf1, temPath1, 2);
checkRestoreTmpDir(conf2, temPath2, 2);
}
@Test

View File

@ -20,21 +20,25 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
/**
* A Scanner which performs a scan over snapshot files. Using this class requires copying the
* snapshot to a temporary empty directory, which will copy the snapshot reference files into that
@ -75,6 +79,7 @@ public class TableSnapshotScanner extends AbstractClientScanner {
private Scan scan;
private ArrayList<RegionInfo> regions;
private TableDescriptor htd;
private final boolean snapshotAlreadyRestored;
private ClientSideRegionScanner currentRegionScanner = null;
private int currentRegion = -1;
@ -83,61 +88,89 @@ public class TableSnapshotScanner extends AbstractClientScanner {
* Creates a TableSnapshotScanner.
* @param conf the configuration
* @param restoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* The scanner deletes the contents of the directory once the scanner is closed.
* have write permissions to this directory, and this should not be a subdirectory of
* rootDir. The scanner deletes the contents of the directory once the scanner is closed.
* @param snapshotName the name of the snapshot to read from
* @param scan a Scan representing scan parameters
* @throws IOException in case of error
*/
public TableSnapshotScanner(Configuration conf, Path restoreDir,
String snapshotName, Scan scan) throws IOException {
public TableSnapshotScanner(Configuration conf, Path restoreDir, String snapshotName, Scan scan)
throws IOException {
this(conf, FSUtils.getRootDir(conf), restoreDir, snapshotName, scan);
}
public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir,
String snapshotName, Scan scan) throws IOException {
this(conf, rootDir, restoreDir, snapshotName, scan, false);
}
/**
* Creates a TableSnapshotScanner.
* @param conf the configuration
* @param rootDir root directory for HBase.
* @param restoreDir a temporary directory to copy the snapshot files into. Current user should
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
* The scanner deletes the contents of the directory once the scanner is closed.
* have write permissions to this directory, and this should not be a subdirectory of
* rootdir. The scanner deletes the contents of the directory once the scanner is closed.
* @param snapshotName the name of the snapshot to read from
* @param scan a Scan representing scan parameters
* @param snapshotAlreadyRestored true to indicate that snapshot has been restored.
* @throws IOException in case of error
*/
public TableSnapshotScanner(Configuration conf, Path rootDir,
Path restoreDir, String snapshotName, Scan scan) throws IOException {
public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir,
String snapshotName, Scan scan, boolean snapshotAlreadyRestored) throws IOException {
this.conf = conf;
this.snapshotName = snapshotName;
this.rootDir = rootDir;
// restoreDir will be deleted in close(), use a unique sub directory
this.restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
this.scan = scan;
this.snapshotAlreadyRestored = snapshotAlreadyRestored;
this.fs = rootDir.getFileSystem(conf);
init();
if (snapshotAlreadyRestored) {
this.restoreDir = restoreDir;
openWithoutRestoringSnapshot();
} else {
// restoreDir will be deleted in close(), use a unique sub directory
this.restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
openWithRestoringSnapshot();
}
initScanMetrics(scan);
}
private void init() throws IOException {
private void openWithoutRestoringSnapshot() throws IOException {
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
SnapshotProtos.SnapshotDescription snapshotDesc =
SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
if (regionManifests == null) {
throw new IllegalArgumentException("Snapshot seems empty, snapshotName: " + snapshotName);
}
regions = new ArrayList<>(regionManifests.size());
regionManifests.stream().map(r -> HRegionInfo.convert(r.getRegionInfo()))
.filter(this::isValidRegion).sorted().forEach(r -> regions.add(r));
htd = manifest.getTableDescriptor();
}
private boolean isValidRegion(RegionInfo hri) {
// An offline split parent region should be excluded.
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
return false;
}
return PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey());
}
private void openWithRestoringSnapshot() throws IOException {
final RestoreSnapshotHelper.RestoreMetaChanges meta =
RestoreSnapshotHelper.copySnapshotForScanner(
conf, fs, rootDir, restoreDir, snapshotName);
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
final List<RegionInfo> restoredRegions = meta.getRegionsToAdd();
htd = meta.getTableDescriptor();
regions = new ArrayList<>(restoredRegions.size());
for (RegionInfo hri : restoredRegions) {
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
continue;
}
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
hri.getEndKey())) {
regions.add(hri);
}
}
// sort for regions according to startKey.
Collections.sort(regions, RegionInfo.COMPARATOR);
initScanMetrics(scan);
restoredRegions.stream().filter(this::isValidRegion).sorted().forEach(r -> regions.add(r));
}
@Override
@ -172,15 +205,28 @@ public class TableSnapshotScanner extends AbstractClientScanner {
}
}
private void cleanup() {
try {
if (fs.exists(this.restoreDir)) {
if (!fs.delete(this.restoreDir, true)) {
LOG.warn(
"Delete restore directory for the snapshot failed. restoreDir: " + this.restoreDir);
}
}
} catch (IOException ex) {
LOG.warn(
"Could not delete restore directory for the snapshot. restoreDir: " + this.restoreDir, ex);
}
}
@Override
public void close() {
if (currentRegionScanner != null) {
currentRegionScanner.close();
}
try {
fs.delete(this.restoreDir, true);
} catch (IOException ex) {
LOG.warn("Could not delete restore directory for the snapshot:" + ex);
// if snapshotAlreadyRestored is true, then we should invoke cleanup() method by hand.
if (!this.snapshotAlreadyRestored) {
cleanup();
}
}

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -187,6 +188,52 @@ public class TestTableSnapshotScanner {
testScanner(UTIL, "testWithMultiRegion", 20, true);
}
@Test
public void testScannerWithRestoreScanner() throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testScanner");
String snapshotName = "testScannerWithRestoreScanner";
try {
createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
Scan scan = new Scan(bbb, yyy); // limit the scan
Configuration conf = UTIL.getConfiguration();
Path rootDir = FSUtils.getRootDir(conf);
TableSnapshotScanner scanner0 =
new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
verifyScanner(scanner0, bbb, yyy);
scanner0.close();
// restore snapshot.
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
// scan the snapshot without restoring snapshot
TableSnapshotScanner scanner =
new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
verifyScanner(scanner, bbb, yyy);
scanner.close();
// check whether the snapshot has been deleted by the close of scanner.
scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
verifyScanner(scanner, bbb, yyy);
scanner.close();
// restore snapshot again.
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
// check whether the snapshot has been deleted by the close of scanner.
scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
verifyScanner(scanner, bbb, yyy);
scanner.close();
} finally {
UTIL.getAdmin().deleteSnapshot(snapshotName);
UTIL.deleteTable(tableName);
tearDownCluster();
}
}
private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions,
boolean shutdownCluster) throws Exception {
setupCluster();