From df8fa5ef23653d7cf701137678b16f6623e9b0ce Mon Sep 17 00:00:00 2001 From: huzheng Date: Thu, 24 Jan 2019 21:29:11 +0800 Subject: [PATCH] HBASE-18484 VerifyRep by snapshot does not work when Yarn/SourceHBase/PeerHBase located in three different HDFS clusters --- .../TableSnapshotInputFormatImpl.java | 2 - .../replication/VerifyReplication.java | 7 +- .../TestVerifyReplicationCrossDiffHdfs.java | 207 ++++++++++++++++++ .../hadoop/hbase/regionserver/HRegion.java | 6 +- 4 files changed, 213 insertions(+), 9 deletions(-) create mode 100644 hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index edd8c7fbc83..73b5d0598a0 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -538,9 +538,7 @@ public class TableSnapshotInputFormatImpl { restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); - // TODO: restore from record readers to parallelize. RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); - conf.set(RESTORE_DIR_KEY, restoreDir.toString()); } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 20f77de5a11..e3e66512ae9 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -124,10 +124,9 @@ public class VerifyReplication extends Configured implements Tool { public static class Verifier extends TableMapper { - - - public static enum Counters { - GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS} + public enum Counters { + GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS + } private Connection sourceConnection; private Table sourceTable; diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java new file mode 100644 index 00000000000..a07e0a82aed --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationCrossDiffHdfs.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.mapreduce.Job; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestVerifyReplicationCrossDiffHdfs { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationCrossDiffHdfs.class); + + private static final Logger LOG = + LoggerFactory.getLogger(TestVerifyReplicationCrossDiffHdfs.class); + + private static HBaseTestingUtility util1; + private static HBaseTestingUtility util2; + private static HBaseTestingUtility mapReduceUtil = new HBaseTestingUtility(); + + private static Configuration conf1 = HBaseConfiguration.create(); + private static Configuration conf2; + + private static final byte[] FAMILY = Bytes.toBytes("f"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + private static final String PEER_ID = "1"; + private static final TableName TABLE_NAME = TableName.valueOf("testVerifyRepCrossDiffHDFS"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + util1 = new HBaseTestingUtility(conf1); + util1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = util1.getZkCluster(); + conf1 = util1.getConfiguration(); + + conf2 = HBaseConfiguration.create(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + util2 = new HBaseTestingUtility(conf2); + util2.setZkCluster(miniZK); + + util1.startMiniCluster(); + util2.startMiniCluster(); + + createTestingTable(util1.getAdmin()); + createTestingTable(util2.getAdmin()); + addTestingPeer(); + + LOG.info("Start to load some data to source cluster."); + loadSomeData(); + + LOG.info("Start mini MapReduce cluster."); + mapReduceUtil.setZkCluster(miniZK); + mapReduceUtil.startMiniMapReduceCluster(); + } + + private static void createTestingTable(Admin admin) throws IOException { + TableDescriptor table = TableDescriptorBuilder.newBuilder(TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(100) + .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) + .build(); + admin.createTable(table); + } + + private static void addTestingPeer() throws IOException { + ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() + .setClusterKey(util2.getClusterKey()).setReplicateAllUserTables(false) + .setTableCFsMap(ImmutableMap.of(TABLE_NAME, ImmutableList.of())).build(); + util1.getAdmin().addReplicationPeer(PEER_ID, rpc); + } + + private static void loadSomeData() throws IOException, InterruptedException { + int numOfRows = 10; + try (Table table = util1.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < numOfRows; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))); + } + } + // Wait some time until the peer received those rows. + Result[] results = null; + try (Table table = util2.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < 100; i++) { + try (ResultScanner rs = table.getScanner(new Scan())) { + results = rs.next(numOfRows); + if (results == null || results.length < numOfRows) { + LOG.info("Retrying, wait until the peer received all the rows, currentRows:" + + (results == null ? 0 : results.length)); + Thread.sleep(100); + } + } + } + } + Assert.assertNotNull(results); + Assert.assertEquals(10, results.length); + } + + @AfterClass + public static void tearDownClass() throws Exception { + if (mapReduceUtil != null) { + mapReduceUtil.shutdownMiniCluster(); + } + if (util2 != null) { + util2.shutdownMiniCluster(); + } + if (util1 != null) { + util1.shutdownMiniCluster(); + } + } + + @Test + public void testVerifyRepBySnapshot() throws Exception { + Path rootDir = FSUtils.getRootDir(conf1); + FileSystem fs = rootDir.getFileSystem(conf1); + String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(util1.getAdmin(), TABLE_NAME, new String(FAMILY), + sourceSnapshotName, rootDir, fs, true); + + // Take target snapshot + Path peerRootDir = FSUtils.getRootDir(conf2); + FileSystem peerFs = peerRootDir.getFileSystem(conf2); + String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis(); + SnapshotTestingUtils.createSnapshotAndValidate(util2.getAdmin(), TABLE_NAME, new String(FAMILY), + peerSnapshotName, peerRootDir, peerFs, true); + + String peerFSAddress = peerFs.getUri().toString(); + String temPath1 = new Path(fs.getUri().toString(), "/tmp1").toString(); + String temPath2 = "/tmp2"; + + String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName, + "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName, + "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress, + "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), PEER_ID, TABLE_NAME.toString() }; + + // Use the yarn's config override the source cluster's config. + Configuration newConf = HBaseConfiguration.create(conf1); + HBaseConfiguration.merge(newConf, mapReduceUtil.getConfiguration()); + newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + CommonFSUtils.setRootDir(newConf, CommonFSUtils.getRootDir(conf1)); + Job job = new VerifyReplication().createSubmittableJob(newConf, args); + if (job == null) { + fail("Job wasn't created, see the log"); + } + if (!job.waitForCompletion(true)) { + fail("Job failed, see the log"); + } + assertEquals(10, + job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue()); + assertEquals(0, + job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3c19a44308a..ffad33aece5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7058,7 +7058,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.info("creating HRegion " + info.getTable().getNameAsString() + " HTD == " + hTableDescriptor + " RootDir = " + rootDir + " Table name == " + info.getTable().getNameAsString()); - FileSystem fs = FileSystem.get(conf); + FileSystem fs = rootDir.getFileSystem(conf); Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info); HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null); @@ -7162,7 +7162,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi fs = rsServices.getFileSystem(); } if (fs == null) { - fs = FileSystem.get(conf); + fs = rootDir.getFileSystem(conf); } return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter); } @@ -7331,7 +7331,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi fs = rsServices.getFileSystem(); } if (fs == null) { - fs = FileSystem.get(conf); + fs = rootDir.getFileSystem(conf); } HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);