From c2d5991b82e3b807cb11f5735ef5068b73720725 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 26 Dec 2018 16:17:55 +0800 Subject: [PATCH] HBASE-21642 CopyTable by reading snapshot and bulkloading will save a lot of time --- .../hadoop/hbase/mapreduce/CopyTable.java | 109 ++++++++++++----- .../hadoop/hbase/mapreduce/TestCopyTable.java | 110 +++++++++++++++--- .../hbase/client/ClientSideRegionScanner.java | 14 ++- .../hadoop/hbase/regionserver/HRegion.java | 2 +- 4 files changed, 187 insertions(+), 48 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java index 4e57f54296d..b59c9e6739d 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Random; +import java.util.UUID; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.mapreduce.Import.CellImporter; +import org.apache.hadoop.hbase.mapreduce.Import.Importer; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -70,8 +72,34 @@ public class CopyTable extends Configured implements Tool { boolean bulkload = false; Path bulkloadDir = null; + boolean readingSnapshot = false; + String snapshot = null; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + private Path generateUniqTempDir(boolean withDirCreated) throws IOException { + FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); + Path dir = new Path(fs.getWorkingDirectory(), NAME); + if (!fs.exists(dir)) { + fs.mkdirs(dir); + } + Path newDir = new Path(dir, UUID.randomUUID().toString()); + if (withDirCreated) { + fs.mkdirs(newDir); + } + return newDir; + } + + private void initCopyTableMapperReducerJob(Job job, Scan scan) throws IOException { + Class mapper = bulkload ? CellImporter.class : Importer.class; + if (readingSnapshot) { + TableMapReduceUtil.initTableSnapshotMapperJob(snapshot, scan, mapper, null, null, job, true, + generateUniqTempDir(true)); + } else { + TableMapReduceUtil.initTableMapperJob(tableName, scan, mapper, null, null, job); + } + } + /** * Sets up the actual job. * @@ -79,13 +107,13 @@ public class CopyTable extends Configured implements Tool { * @return The newly created job. * @throws IOException When setting up the job fails. */ - public Job createSubmittableJob(String[] args) - throws IOException { + public Job createSubmittableJob(String[] args) throws IOException { if (!doCommandLine(args)) { return null; } - Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, NAME + "_" + tableName)); + String jobName = NAME + "_" + (tableName == null ? snapshot : tableName); + Job job = Job.getInstance(getConf(), getConf().get(JOB_NAME_CONF_KEY, jobName)); job.setJarByClass(CopyTable.class); Scan scan = new Scan(); @@ -107,15 +135,15 @@ public class CopyTable extends Configured implements Tool { job.getConfiguration().set(TableInputFormat.SHUFFLE_MAPS, "true"); } if (versions >= 0) { - scan.setMaxVersions(versions); + scan.readVersions(versions); } if (startRow != null) { - scan.setStartRow(Bytes.toBytesBinary(startRow)); + scan.withStartRow(Bytes.toBytesBinary(startRow)); } if (stopRow != null) { - scan.setStopRow(Bytes.toBytesBinary(stopRow)); + scan.withStopRow(Bytes.toBytesBinary(stopRow)); } if(families != null) { @@ -140,24 +168,13 @@ public class CopyTable extends Configured implements Tool { job.setNumReduceTasks(0); if (bulkload) { - TableMapReduceUtil.initTableMapperJob(tableName, scan, Import.CellImporter.class, null, null, - job); + initCopyTableMapperReducerJob(job, scan); // We need to split the inputs by destination tables so that output of Map can be bulk-loaded. TableInputFormat.configureSplitTable(job, TableName.valueOf(dstTableName)); - FileSystem fs = FSUtils.getCurrentFileSystem(getConf()); - Random rand = new Random(); - Path root = new Path(fs.getWorkingDirectory(), "copytable"); - fs.mkdirs(root); - while (true) { - bulkloadDir = new Path(root, "" + rand.nextLong()); - if (!fs.exists(bulkloadDir)) { - break; - } - } - - System.out.println("HFiles will be stored at " + this.bulkloadDir); + bulkloadDir = generateUniqTempDir(false); + LOG.info("HFiles will be stored at " + this.bulkloadDir); HFileOutputFormat2.setOutputPath(job, bulkloadDir); try (Connection conn = ConnectionFactory.createConnection(getConf()); Admin admin = conn.getAdmin()) { @@ -165,9 +182,7 @@ public class CopyTable extends Configured implements Tool { admin.getDescriptor((TableName.valueOf(dstTableName)))); } } else { - TableMapReduceUtil.initTableMapperJob(tableName, scan, - Import.Importer.class, null, null, job); - + initCopyTableMapperReducerJob(job, scan); TableMapReduceUtil.initTableReducerJob(dstTableName, null, job, null, peerAddress, null, null); } @@ -183,7 +198,7 @@ public class CopyTable extends Configured implements Tool { System.err.println("ERROR: " + errorMsg); } System.err.println("Usage: CopyTable [general options] [--starttime=X] [--endtime=Y] " + - "[--new.name=NEW] [--peer.adr=ADR] "); + "[--new.name=NEW] [--peer.adr=ADR] "); System.err.println(); System.err.println("Options:"); System.err.println(" rs.class hbase.regionserver.class of the peer cluster"); @@ -205,6 +220,7 @@ public class CopyTable extends Configured implements Tool { System.err.println(" all.cells also copy delete markers and deleted cells"); System.err.println(" bulkload Write input into HFiles and bulk load to the destination " + "table"); + System.err.println(" snapshot Copy the data from snapshot to destination table."); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to copy"); @@ -214,6 +230,12 @@ public class CopyTable extends Configured implements Tool { System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289 " + "--peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable "); + System.err.println(" To copy data from 'sourceTableSnapshot' to 'destTable': "); + System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " + + "--snapshot --new.name=destTable sourceTableSnapshot"); + System.err.println(" To copy data from 'sourceTableSnapshot' and bulk load to 'destTable': "); + System.err.println(" $ hbase org.apache.hadoop.hbase.mapreduce.CopyTable " + + "--new.name=destTable --snapshot --bulkload sourceTableSnapshot"); System.err.println("For performance consider the following general option:\n" + " It is recommended that you set the following to >=100. A higher value uses more memory but\n" + " decreases the round trip time to the server and may increase performance.\n" @@ -224,8 +246,6 @@ public class CopyTable extends Configured implements Tool { } private boolean doCommandLine(final String[] args) { - // Process command-line args. TODO: Better cmd-line processing - // (but hopefully something not as painful as cli options). if (args.length < 1) { printUsage(null); return false; @@ -313,16 +333,24 @@ public class CopyTable extends Configured implements Tool { continue; } - if (i == args.length-1) { - tableName = cmd; + if(cmd.startsWith("--snapshot")){ + readingSnapshot = true; + continue; + } + + if (i == args.length - 1) { + if (readingSnapshot) { + snapshot = cmd; + } else { + tableName = cmd; + } } else { printUsage("Invalid argument '" + cmd + "'"); return false; } } if (dstTableName == null && peerAddress == null) { - printUsage("At least a new table name or a " + - "peer address must be specified"); + printUsage("At least a new table name or a peer address must be specified"); return false; } if ((endTime != 0) && (startTime > endTime)) { @@ -335,6 +363,22 @@ public class CopyTable extends Configured implements Tool { return false; } + if (readingSnapshot && peerAddress != null) { + printUsage("Loading data from snapshot to remote peer cluster is not supported."); + return false; + } + + if (readingSnapshot && dstTableName == null) { + printUsage("The --new.name= for destination table should be " + + "provided when copying data from snapshot ."); + return false; + } + + if (readingSnapshot && snapshot == null) { + printUsage("Snapshot shouldn't be null when --snapshot is enabled."); + return false; + } + // set dstTableName if necessary if (dstTableName == null) { dstTableName = tableName; @@ -371,6 +415,9 @@ public class CopyTable extends Configured implements Tool { } int code = 0; if (bulkload) { + LOG.info("Trying to bulk load data to destination table: " + dstTableName); + LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}", + this.bulkloadDir.toString(), this.dstTableName); code = new LoadIncrementalHFiles(this.getConf()) .run(new String[] { this.bulkloadDir.toString(), this.dstTableName }); if (code == 0) { diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index ed6857dd3b6..5591e5fa4f6 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -24,12 +24,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.PrintStream; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -45,6 +48,7 @@ import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -93,14 +97,9 @@ public class TestCopyTable { try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); Table t2 = TEST_UTIL.createTable(tableName2, FAMILY)) { // put rows into the first table - for (int i = 0; i < 10; i++) { - Put p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - t1.put(p); - } + loadData(t1, FAMILY, COLUMN1); CopyTable copy = new CopyTable(); - int code; if (bulkload) { code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), @@ -114,12 +113,7 @@ public class TestCopyTable { assertEquals("copy job failed", 0, code); // verify the data was copied into table 2 - for (int i = 0; i < 10; i++) { - Get g = new Get(Bytes.toBytes("row" + i)); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1)); - } + verifyRows(t2, FAMILY, COLUMN1); } finally { TEST_UTIL.deleteTable(tableName1); TEST_UTIL.deleteTable(tableName2); @@ -185,7 +179,6 @@ public class TestCopyTable { t2.getDescriptor().getValues().size()); assertTrue("The mob row count is 0 but should be > 0", MobTestUtil.countMobRows(t2) > 0); - } finally { TEST_UTIL.deleteTable(tableName1); TEST_UTIL.deleteTable(tableName2); @@ -349,4 +342,95 @@ public class TestCopyTable { args); return status == 0; } + + private void loadData(Table t, byte[] family, byte[] column) throws IOException { + for (int i = 0; i < 10; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put p = new Put(row); + p.addColumn(family, column, row); + t.put(p); + } + } + + private void verifyRows(Table t, byte[] family, byte[] column) throws IOException { + for (int i = 0; i < 10; i++) { + byte[] row = Bytes.toBytes("row" + i); + Get g = new Get(row).addFamily(family); + Result r = t.get(g); + Assert.assertNotNull(r); + Assert.assertEquals(1, r.size()); + Cell cell = r.rawCells()[0]; + Assert.assertTrue(CellUtil.matchingQualifier(cell, column)); + Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength(), row, 0, row.length), 0); + } + } + + private Table createTable(TableName tableName, byte[] family, boolean isMob) throws IOException { + if (isMob) { + ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(family) + .setMobEnabled(true).setMobThreshold(1).build(); + TableDescriptor desc = + TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(cfd).build(); + return TEST_UTIL.createTable(desc, null); + } else { + return TEST_UTIL.createTable(tableName, family); + } + } + + private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad, boolean isMob) + throws Exception { + TableName table1 = TableName.valueOf(tablePrefix + 1); + TableName table2 = TableName.valueOf(tablePrefix + 2); + Table t1 = createTable(table1, FAMILY_A, isMob); + Table t2 = createTable(table2, FAMILY_A, isMob); + loadData(t1, FAMILY_A, Bytes.toBytes("qualifier")); + String snapshot = tablePrefix + "_snapshot"; + TEST_UTIL.getAdmin().snapshot(snapshot, table1); + boolean success; + if (bulkLoad) { + success = + runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot }); + } else { + success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot }); + } + Assert.assertTrue(success); + verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier")); + } + + @Test + public void testLoadingSnapshotToTable() throws Exception { + testCopyTableBySnapshot("testLoadingSnapshotToTable", false, false); + } + + @Test + public void tsetLoadingSnapshotToMobTable() throws Exception { + testCopyTableBySnapshot("testLoadingSnapshotToMobTable", false, true); + } + + @Test + public void testLoadingSnapshotAndBulkLoadToTable() throws Exception { + testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToTable", true, false); + } + + @Test + public void testLoadingSnapshotAndBulkLoadToMobTable() throws Exception { + testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToMobTable", true, true); + } + + @Test + public void testLoadingSnapshotToRemoteCluster() throws Exception { + Assert.assertFalse(runCopy( + new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" })); + } + + @Test + public void testLoadingSnapshotWithoutSnapshotName() throws Exception { + Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" })); + } + + @Test + public void testLoadingSnapshotWithoutDestTable() throws Exception { + Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" })); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index 7a1a57814f2..23a239970d9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -26,11 +26,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,14 +51,21 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, TableDescriptor htd, RegionInfo hri, Scan scan, ScanMetrics scanMetrics) - throws IOException { + throws IOException { // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); htd = TableDescriptorBuilder.newBuilder(htd).setReadOnly(true).build(); // open region from the snapshot directory - this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); + region = HRegion.newHRegion(FSUtils.getTableDir(rootDir, htd.getTableName()), null, fs, conf, + hri, htd, null); + // we won't initialize the MobFileCache when not running in RS process. so provided an + // initialized cache. Consider the case: an CF was set from an mob to non-mob. if we only + // initialize cache for MOB region, NPE from HMobStore will still happen. So Initialize the + // cache for every region although it may hasn't any mob CF, BTW the cache is very light-weight. + region.setMobFileCache(new MobFileCache(conf)); + region.initialize(); // create an internal region scanner this.scanner = region.getScanner(scan); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index dc0fa220b6a..9bf93090a16 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7055,7 +7055,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @param htd the table descriptor * @return the new instance */ - static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs, + public static HRegion newHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration conf, RegionInfo regionInfo, final TableDescriptor htd, RegionServerServices rsServices) { try {