HBASE-18806 VerifyRep by snapshot need not to restore snapshot for each mapper
This commit is contained in:
parent
f0011ebfe0
commit
16e8422855
|
@ -345,22 +345,20 @@ public class TableMapReduceUtil {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets up the job for reading from a table snapshot. It bypasses hbase servers
|
||||
* and read directly from snapshot files.
|
||||
*
|
||||
* Sets up the job for reading from a table snapshot. It bypasses hbase servers and read directly
|
||||
* from snapshot files.
|
||||
* @param snapshotName The name of the snapshot (of a table) to read from.
|
||||
* @param scan The scan instance with the columns, time range etc.
|
||||
* @param mapper The mapper class to use.
|
||||
* @param outputKeyClass The class of the output key.
|
||||
* @param outputValueClass The class of the output value.
|
||||
* @param job The current job to adjust. Make sure the passed job is
|
||||
* carrying all necessary HBase configuration.
|
||||
* @param addDependencyJars upload HBase jars and jars for any of the configured
|
||||
* job classes via the distributed cache (tmpjars).
|
||||
*
|
||||
* @param job The current job to adjust. Make sure the passed job is carrying all necessary HBase
|
||||
* configuration.
|
||||
* @param addDependencyJars upload HBase jars and jars for any of the configured job classes via
|
||||
* the distributed cache (tmpjars).
|
||||
* @param tmpRestoreDir a temporary directory to copy the snapshot files into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* After the job is finished, restore directory can be deleted.
|
||||
* have write permissions to this directory, and this should not be a subdirectory of
|
||||
* rootdir. After the job is finished, restore directory can be deleted.
|
||||
* @throws IOException When setting up the details fails.
|
||||
* @see TableSnapshotInputFormat
|
||||
*/
|
||||
|
@ -371,8 +369,8 @@ public class TableMapReduceUtil {
|
|||
boolean addDependencyJars, Path tmpRestoreDir)
|
||||
throws IOException {
|
||||
TableSnapshotInputFormat.setInput(job, snapshotName, tmpRestoreDir);
|
||||
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass,
|
||||
outputValueClass, job, addDependencyJars, false, TableSnapshotInputFormat.class);
|
||||
initTableMapperJob(snapshotName, scan, mapper, outputKeyClass, outputValueClass, job,
|
||||
addDependencyJars, false, TableSnapshotInputFormat.class);
|
||||
resetCacheConfig(job.getConfiguration());
|
||||
}
|
||||
|
||||
|
|
|
@ -491,8 +491,8 @@ public class TableSnapshotInputFormatImpl {
|
|||
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
|
||||
* @param conf the job to configuration
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @param restoreDir a temporary directory to restore the snapshot into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* @param restoreDir a temporary directory to restore the snapshot into. Current user should have
|
||||
* write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* After the job is finished, restoreDir can be deleted.
|
||||
* @throws IOException if an error occurs
|
||||
*/
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.mapreduce.replication;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
|
@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -211,8 +213,8 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
+ peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
|
||||
+ " peerFSAddress:" + peerFSAddress);
|
||||
|
||||
replicatedScanner = new TableSnapshotScanner(peerConf,
|
||||
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
|
||||
replicatedScanner = new TableSnapshotScanner(peerConf, FSUtils.getRootDir(peerConf),
|
||||
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
|
||||
} else {
|
||||
replicatedScanner = replicatedTable.getScanner(scan);
|
||||
}
|
||||
|
@ -360,6 +362,17 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorumAddress)
|
||||
throws IOException {
|
||||
Configuration peerConf =
|
||||
HBaseConfiguration.createClusterConf(conf, peerQuorumAddress, PEER_CONFIG_PREFIX);
|
||||
FileSystem.setDefaultUri(peerConf, peerFSAddress);
|
||||
FSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress));
|
||||
FileSystem fs = FileSystem.get(peerConf);
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, FSUtils.getRootDir(peerConf),
|
||||
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the actual job.
|
||||
*
|
||||
|
@ -404,7 +417,13 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
//Set Snapshot specific parameters
|
||||
if (peerSnapshotName != null) {
|
||||
conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
|
||||
|
||||
// for verifyRep by snapshot, choose a unique sub-directory under peerSnapshotTmpDir to
|
||||
// restore snapshot.
|
||||
Path restoreDir = new Path(peerSnapshotTmpDir, UUID.randomUUID().toString());
|
||||
peerSnapshotTmpDir = restoreDir.toString();
|
||||
conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
|
||||
|
||||
conf.set(NAME + ".peerFSAddress", peerFSAddress);
|
||||
conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
|
||||
|
||||
|
@ -441,6 +460,7 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
"Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
|
||||
TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
|
||||
null, job, true, snapshotTempPath);
|
||||
restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
|
||||
} else {
|
||||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
|
|||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -31,6 +32,7 @@ import java.util.NavigableMap;
|
|||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
|
@ -907,6 +909,17 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
|
||||
}
|
||||
|
||||
private void checkRestoreTmpDir(Configuration conf, String restoreTmpDir, int expectedCount)
|
||||
throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
FileStatus[] subDirectories = fs.listStatus(new Path(restoreTmpDir));
|
||||
assertNotNull(subDirectories);
|
||||
assertEquals(subDirectories.length, expectedCount);
|
||||
for (int i = 0; i < expectedCount; i++) {
|
||||
assertTrue(subDirectories[i].isDirectory());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testVerifyReplicationWithSnapshotSupport() throws Exception {
|
||||
// Populate the tables, at the same time it guarantees that the tables are
|
||||
|
@ -948,6 +961,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
assertEquals(0,
|
||||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
|
||||
|
||||
checkRestoreTmpDir(conf1, temPath1, 1);
|
||||
checkRestoreTmpDir(conf2, temPath2, 1);
|
||||
|
||||
Scan scan = new Scan();
|
||||
ResultScanner rs = htable2.getScanner(scan);
|
||||
Put put = null;
|
||||
|
@ -985,6 +1001,9 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
|
||||
assertEquals(NB_ROWS_IN_BATCH,
|
||||
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
|
||||
|
||||
checkRestoreTmpDir(conf1, temPath1, 2);
|
||||
checkRestoreTmpDir(conf2, temPath2, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -20,21 +20,25 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
|
||||
|
||||
/**
|
||||
* A Scanner which performs a scan over snapshot files. Using this class requires copying the
|
||||
* snapshot to a temporary empty directory, which will copy the snapshot reference files into that
|
||||
|
@ -75,6 +79,7 @@ public class TableSnapshotScanner extends AbstractClientScanner {
|
|||
private Scan scan;
|
||||
private ArrayList<RegionInfo> regions;
|
||||
private TableDescriptor htd;
|
||||
private final boolean snapshotAlreadyRestored;
|
||||
|
||||
private ClientSideRegionScanner currentRegionScanner = null;
|
||||
private int currentRegion = -1;
|
||||
|
@ -83,61 +88,89 @@ public class TableSnapshotScanner extends AbstractClientScanner {
|
|||
* Creates a TableSnapshotScanner.
|
||||
* @param conf the configuration
|
||||
* @param restoreDir a temporary directory to copy the snapshot files into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* The scanner deletes the contents of the directory once the scanner is closed.
|
||||
* have write permissions to this directory, and this should not be a subdirectory of
|
||||
* rootDir. The scanner deletes the contents of the directory once the scanner is closed.
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @param scan a Scan representing scan parameters
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
public TableSnapshotScanner(Configuration conf, Path restoreDir,
|
||||
String snapshotName, Scan scan) throws IOException {
|
||||
public TableSnapshotScanner(Configuration conf, Path restoreDir, String snapshotName, Scan scan)
|
||||
throws IOException {
|
||||
this(conf, FSUtils.getRootDir(conf), restoreDir, snapshotName, scan);
|
||||
}
|
||||
|
||||
public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir,
|
||||
String snapshotName, Scan scan) throws IOException {
|
||||
this(conf, rootDir, restoreDir, snapshotName, scan, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a TableSnapshotScanner.
|
||||
* @param conf the configuration
|
||||
* @param rootDir root directory for HBase.
|
||||
* @param restoreDir a temporary directory to copy the snapshot files into. Current user should
|
||||
* have write permissions to this directory, and this should not be a subdirectory of rootdir.
|
||||
* The scanner deletes the contents of the directory once the scanner is closed.
|
||||
* have write permissions to this directory, and this should not be a subdirectory of
|
||||
* rootdir. The scanner deletes the contents of the directory once the scanner is closed.
|
||||
* @param snapshotName the name of the snapshot to read from
|
||||
* @param scan a Scan representing scan parameters
|
||||
* @param snapshotAlreadyRestored true to indicate that snapshot has been restored.
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
public TableSnapshotScanner(Configuration conf, Path rootDir,
|
||||
Path restoreDir, String snapshotName, Scan scan) throws IOException {
|
||||
public TableSnapshotScanner(Configuration conf, Path rootDir, Path restoreDir,
|
||||
String snapshotName, Scan scan, boolean snapshotAlreadyRestored) throws IOException {
|
||||
this.conf = conf;
|
||||
this.snapshotName = snapshotName;
|
||||
this.rootDir = rootDir;
|
||||
this.scan = scan;
|
||||
this.snapshotAlreadyRestored = snapshotAlreadyRestored;
|
||||
this.fs = rootDir.getFileSystem(conf);
|
||||
|
||||
if (snapshotAlreadyRestored) {
|
||||
this.restoreDir = restoreDir;
|
||||
openWithoutRestoringSnapshot();
|
||||
} else {
|
||||
// restoreDir will be deleted in close(), use a unique sub directory
|
||||
this.restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
|
||||
this.scan = scan;
|
||||
this.fs = rootDir.getFileSystem(conf);
|
||||
init();
|
||||
openWithRestoringSnapshot();
|
||||
}
|
||||
|
||||
private void init() throws IOException {
|
||||
initScanMetrics(scan);
|
||||
}
|
||||
|
||||
private void openWithoutRestoringSnapshot() throws IOException {
|
||||
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
|
||||
SnapshotProtos.SnapshotDescription snapshotDesc =
|
||||
SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
|
||||
|
||||
SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
|
||||
List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
|
||||
if (regionManifests == null) {
|
||||
throw new IllegalArgumentException("Snapshot seems empty, snapshotName: " + snapshotName);
|
||||
}
|
||||
|
||||
regions = new ArrayList<>(regionManifests.size());
|
||||
regionManifests.stream().map(r -> HRegionInfo.convert(r.getRegionInfo()))
|
||||
.filter(this::isValidRegion).sorted().forEach(r -> regions.add(r));
|
||||
htd = manifest.getTableDescriptor();
|
||||
}
|
||||
|
||||
private boolean isValidRegion(RegionInfo hri) {
|
||||
// An offline split parent region should be excluded.
|
||||
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
|
||||
return false;
|
||||
}
|
||||
return PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
|
||||
hri.getEndKey());
|
||||
}
|
||||
|
||||
private void openWithRestoringSnapshot() throws IOException {
|
||||
final RestoreSnapshotHelper.RestoreMetaChanges meta =
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(
|
||||
conf, fs, rootDir, restoreDir, snapshotName);
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
final List<RegionInfo> restoredRegions = meta.getRegionsToAdd();
|
||||
|
||||
htd = meta.getTableDescriptor();
|
||||
regions = new ArrayList<>(restoredRegions.size());
|
||||
for (RegionInfo hri : restoredRegions) {
|
||||
if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
|
||||
continue;
|
||||
}
|
||||
if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
|
||||
hri.getEndKey())) {
|
||||
regions.add(hri);
|
||||
}
|
||||
}
|
||||
|
||||
// sort for regions according to startKey.
|
||||
Collections.sort(regions, RegionInfo.COMPARATOR);
|
||||
initScanMetrics(scan);
|
||||
restoredRegions.stream().filter(this::isValidRegion).sorted().forEach(r -> regions.add(r));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -172,15 +205,28 @@ public class TableSnapshotScanner extends AbstractClientScanner {
|
|||
}
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
try {
|
||||
if (fs.exists(this.restoreDir)) {
|
||||
if (!fs.delete(this.restoreDir, true)) {
|
||||
LOG.warn(
|
||||
"Delete restore directory for the snapshot failed. restoreDir: " + this.restoreDir);
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LOG.warn(
|
||||
"Could not delete restore directory for the snapshot. restoreDir: " + this.restoreDir, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (currentRegionScanner != null) {
|
||||
currentRegionScanner.close();
|
||||
}
|
||||
try {
|
||||
fs.delete(this.restoreDir, true);
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("Could not delete restore directory for the snapshot:" + ex);
|
||||
// if snapshotAlreadyRestored is true, then we should invoke cleanup() method by hand.
|
||||
if (!this.snapshotAlreadyRestored) {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
|
@ -187,6 +188,52 @@ public class TestTableSnapshotScanner {
|
|||
testScanner(UTIL, "testWithMultiRegion", 20, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScannerWithRestoreScanner() throws Exception {
|
||||
setupCluster();
|
||||
TableName tableName = TableName.valueOf("testScanner");
|
||||
String snapshotName = "testScannerWithRestoreScanner";
|
||||
try {
|
||||
createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
|
||||
Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
|
||||
Scan scan = new Scan(bbb, yyy); // limit the scan
|
||||
|
||||
Configuration conf = UTIL.getConfiguration();
|
||||
Path rootDir = FSUtils.getRootDir(conf);
|
||||
|
||||
TableSnapshotScanner scanner0 =
|
||||
new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
|
||||
verifyScanner(scanner0, bbb, yyy);
|
||||
scanner0.close();
|
||||
|
||||
// restore snapshot.
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
|
||||
// scan the snapshot without restoring snapshot
|
||||
TableSnapshotScanner scanner =
|
||||
new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
|
||||
verifyScanner(scanner, bbb, yyy);
|
||||
scanner.close();
|
||||
|
||||
// check whether the snapshot has been deleted by the close of scanner.
|
||||
scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
|
||||
verifyScanner(scanner, bbb, yyy);
|
||||
scanner.close();
|
||||
|
||||
// restore snapshot again.
|
||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||
|
||||
// check whether the snapshot has been deleted by the close of scanner.
|
||||
scanner = new TableSnapshotScanner(conf, rootDir, restoreDir, snapshotName, scan, true);
|
||||
verifyScanner(scanner, bbb, yyy);
|
||||
scanner.close();
|
||||
} finally {
|
||||
UTIL.getAdmin().deleteSnapshot(snapshotName);
|
||||
UTIL.deleteTable(tableName);
|
||||
tearDownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
private void testScanner(HBaseTestingUtility util, String snapshotName, int numRegions,
|
||||
boolean shutdownCluster) throws Exception {
|
||||
setupCluster();
|
||||
|
|
Loading…
Reference in New Issue