HBASE-18484 VerifyRep by snapshot does not work when Yarn/SourceHBase/PeerHBase located in three different HDFS clusters
This commit is contained in:
parent
78d3d5628a
commit
1df571cb36
|
@ -538,9 +538,7 @@ public class TableSnapshotInputFormatImpl {
|
||||||
|
|
||||||
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
|
restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
|
||||||
|
|
||||||
// TODO: restore from record readers to parallelize.
|
|
||||||
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
|
||||||
|
|
||||||
conf.set(RESTORE_DIR_KEY, restoreDir.toString());
|
conf.set(RESTORE_DIR_KEY, restoreDir.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,10 +124,9 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
public static class Verifier
|
public static class Verifier
|
||||||
extends TableMapper<ImmutableBytesWritable, Put> {
|
extends TableMapper<ImmutableBytesWritable, Put> {
|
||||||
|
|
||||||
|
public enum Counters {
|
||||||
|
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS
|
||||||
public static enum Counters {
|
}
|
||||||
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
|
|
||||||
|
|
||||||
private Connection sourceConnection;
|
private Connection sourceConnection;
|
||||||
private Table sourceTable;
|
private Table sourceTable;
|
||||||
|
|
|
@ -0,0 +1,207 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
|
||||||
|
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
|
public class TestVerifyReplicationCrossDiffHdfs {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestVerifyReplicationCrossDiffHdfs.class);
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestVerifyReplicationCrossDiffHdfs.class);
|
||||||
|
|
||||||
|
private static HBaseTestingUtility util1;
|
||||||
|
private static HBaseTestingUtility util2;
|
||||||
|
private static HBaseTestingUtility mapReduceUtil = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static Configuration conf1 = HBaseConfiguration.create();
|
||||||
|
private static Configuration conf2;
|
||||||
|
|
||||||
|
private static final byte[] FAMILY = Bytes.toBytes("f");
|
||||||
|
private static final byte[] QUALIFIER = Bytes.toBytes("q");
|
||||||
|
private static final String PEER_ID = "1";
|
||||||
|
private static final TableName TABLE_NAME = TableName.valueOf("testVerifyRepCrossDiffHDFS");
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||||
|
util1 = new HBaseTestingUtility(conf1);
|
||||||
|
util1.startMiniZKCluster();
|
||||||
|
MiniZooKeeperCluster miniZK = util1.getZkCluster();
|
||||||
|
conf1 = util1.getConfiguration();
|
||||||
|
|
||||||
|
conf2 = HBaseConfiguration.create(conf1);
|
||||||
|
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||||
|
util2 = new HBaseTestingUtility(conf2);
|
||||||
|
util2.setZkCluster(miniZK);
|
||||||
|
|
||||||
|
util1.startMiniCluster();
|
||||||
|
util2.startMiniCluster();
|
||||||
|
|
||||||
|
createTestingTable(util1.getAdmin());
|
||||||
|
createTestingTable(util2.getAdmin());
|
||||||
|
addTestingPeer();
|
||||||
|
|
||||||
|
LOG.info("Start to load some data to source cluster.");
|
||||||
|
loadSomeData();
|
||||||
|
|
||||||
|
LOG.info("Start mini MapReduce cluster.");
|
||||||
|
mapReduceUtil.setZkCluster(miniZK);
|
||||||
|
mapReduceUtil.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createTestingTable(Admin admin) throws IOException {
|
||||||
|
TableDescriptor table = TableDescriptorBuilder.newBuilder(TABLE_NAME)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(100)
|
||||||
|
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
|
||||||
|
.build();
|
||||||
|
admin.createTable(table);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addTestingPeer() throws IOException {
|
||||||
|
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
|
||||||
|
.setClusterKey(util2.getClusterKey()).setReplicateAllUserTables(false)
|
||||||
|
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, ImmutableList.of())).build();
|
||||||
|
util1.getAdmin().addReplicationPeer(PEER_ID, rpc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void loadSomeData() throws IOException, InterruptedException {
|
||||||
|
int numOfRows = 10;
|
||||||
|
try (Table table = util1.getConnection().getTable(TABLE_NAME)) {
|
||||||
|
for (int i = 0; i < numOfRows; i++) {
|
||||||
|
table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Wait some time until the peer received those rows.
|
||||||
|
Result[] results = null;
|
||||||
|
try (Table table = util2.getConnection().getTable(TABLE_NAME)) {
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
try (ResultScanner rs = table.getScanner(new Scan())) {
|
||||||
|
results = rs.next(numOfRows);
|
||||||
|
if (results == null || results.length < numOfRows) {
|
||||||
|
LOG.info("Retrying, wait until the peer received all the rows, currentRows:"
|
||||||
|
+ (results == null ? 0 : results.length));
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertNotNull(results);
|
||||||
|
Assert.assertEquals(10, results.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownClass() throws Exception {
|
||||||
|
if (mapReduceUtil != null) {
|
||||||
|
mapReduceUtil.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
if (util2 != null) {
|
||||||
|
util2.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
if (util1 != null) {
|
||||||
|
util1.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVerifyRepBySnapshot() throws Exception {
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf1);
|
||||||
|
FileSystem fs = rootDir.getFileSystem(conf1);
|
||||||
|
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
|
||||||
|
SnapshotTestingUtils.createSnapshotAndValidate(util1.getAdmin(), TABLE_NAME, new String(FAMILY),
|
||||||
|
sourceSnapshotName, rootDir, fs, true);
|
||||||
|
|
||||||
|
// Take target snapshot
|
||||||
|
Path peerRootDir = FSUtils.getRootDir(conf2);
|
||||||
|
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
|
||||||
|
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
|
||||||
|
SnapshotTestingUtils.createSnapshotAndValidate(util2.getAdmin(), TABLE_NAME, new String(FAMILY),
|
||||||
|
peerSnapshotName, peerRootDir, peerFs, true);
|
||||||
|
|
||||||
|
String peerFSAddress = peerFs.getUri().toString();
|
||||||
|
String temPath1 = new Path(fs.getUri().toString(), "/tmp1").toString();
|
||||||
|
String temPath2 = "/tmp2";
|
||||||
|
|
||||||
|
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
|
||||||
|
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
|
||||||
|
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
|
||||||
|
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), PEER_ID, TABLE_NAME.toString() };
|
||||||
|
|
||||||
|
// Use the yarn's config override the source cluster's config.
|
||||||
|
Configuration newConf = HBaseConfiguration.create(conf1);
|
||||||
|
HBaseConfiguration.merge(newConf, mapReduceUtil.getConfiguration());
|
||||||
|
newConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||||
|
CommonFSUtils.setRootDir(newConf, CommonFSUtils.getRootDir(conf1));
|
||||||
|
Job job = new VerifyReplication().createSubmittableJob(newConf, args);
|
||||||
|
if (job == null) {
|
||||||
|
fail("Job wasn't created, see the log");
|
||||||
|
}
|
||||||
|
if (!job.waitForCompletion(true)) {
|
||||||
|
fail("Job failed, see the log");
|
||||||
|
}
|
||||||
|
assertEquals(10,
|
||||||
|
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
|
||||||
|
assertEquals(0,
|
||||||
|
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
|
||||||
|
}
|
||||||
|
}
|
|
@ -7112,7 +7112,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
LOG.info("creating HRegion " + info.getTable().getNameAsString()
|
LOG.info("creating HRegion " + info.getTable().getNameAsString()
|
||||||
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
|
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
|
||||||
" Table name == " + info.getTable().getNameAsString());
|
" Table name == " + info.getTable().getNameAsString());
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = rootDir.getFileSystem(conf);
|
||||||
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
|
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
|
||||||
HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
|
HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, info);
|
||||||
HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null);
|
HRegion region = HRegion.newHRegion(tableDir, wal, fs, conf, info, hTableDescriptor, null);
|
||||||
|
@ -7216,7 +7216,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
fs = rsServices.getFileSystem();
|
fs = rsServices.getFileSystem();
|
||||||
}
|
}
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
fs = FileSystem.get(conf);
|
fs = rootDir.getFileSystem(conf);
|
||||||
}
|
}
|
||||||
return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
|
return openHRegion(conf, fs, rootDir, info, htd, wal, rsServices, reporter);
|
||||||
}
|
}
|
||||||
|
@ -7387,7 +7387,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
fs = rsServices.getFileSystem();
|
fs = rsServices.getFileSystem();
|
||||||
}
|
}
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
fs = FileSystem.get(conf);
|
fs = rootDir.getFileSystem(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
|
HRegion r = HRegion.newHRegion(tableDir, wal, fs, conf, info, htd, null);
|
||||||
|
|
Loading…
Reference in New Issue