HBASE-9047 - addendum, include new files
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1551464 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7ea071db1f
commit
fec8ee413b
|
@ -498,6 +498,26 @@ public class ReplicationSource extends Thread
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
|
||||||
|
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
|
||||||
|
if (stopper instanceof ReplicationSyncUp.DummyServer) {
|
||||||
|
FileStatus[] rss = fs.listStatus(manager.getLogDir());
|
||||||
|
for (FileStatus rs : rss) {
|
||||||
|
Path p = rs.getPath();
|
||||||
|
FileStatus[] logs = fs.listStatus(p);
|
||||||
|
for (FileStatus log : logs) {
|
||||||
|
p = new Path(p, log.getPath().getName());
|
||||||
|
if (p.getName().equals(currentPath.getName())) {
|
||||||
|
currentPath = p;
|
||||||
|
LOG.info("Log " + this.currentPath + " exists under " + manager.getLogDir());
|
||||||
|
// Open the log at the new location
|
||||||
|
this.openReader(sleepMultiplier);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO What happens if the log was missing from every single location?
|
// TODO What happens if the log was missing from every single location?
|
||||||
// Although we need to check a couple of times as the log could have
|
// Although we need to check a couple of times as the log could have
|
||||||
// been moved by the master between the checks
|
// been moved by the master between the checks
|
||||||
|
|
|
@ -285,6 +285,14 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
return this.sources;
|
return this.sources;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of all the old sources of this rs
|
||||||
|
* @return list of all old sources
|
||||||
|
*/
|
||||||
|
public List<ReplicationSourceInterface> getOldSources() {
|
||||||
|
return this.oldsources;
|
||||||
|
}
|
||||||
|
|
||||||
void preLogRoll(Path newLog) throws IOException {
|
void preLogRoll(Path newLog) throws IOException {
|
||||||
|
|
||||||
synchronized (this.hlogsById) {
|
synchronized (this.hlogsById) {
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In a scenario of Replication based Disaster/Recovery, when hbase
|
||||||
|
* Master-Cluster crashes, this tool is used to sync-up the delta from Master to
|
||||||
|
* Slave using the info from Zookeeper. The tool will run on Master-Cluser, and
|
||||||
|
* assume ZK, Filesystem and NetWork still available after hbase crashes
|
||||||
|
*
|
||||||
|
* hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
|
||||||
|
*/
|
||||||
|
|
||||||
|
public class ReplicationSyncUp extends Configured implements Tool {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(ReplicationSyncUp.class.getName());
|
||||||
|
|
||||||
|
private static Configuration conf;
|
||||||
|
|
||||||
|
private static final long SLEEP_TIME = 10000;
|
||||||
|
|
||||||
|
// although the tool is designed to be run on command line
|
||||||
|
// this api is provided for executing the tool through another app
|
||||||
|
public static void setConfigure(Configuration config) {
|
||||||
|
conf = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main program
|
||||||
|
* @param args
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
if (conf == null) conf = HBaseConfiguration.create();
|
||||||
|
int ret = ToolRunner.run(conf, new ReplicationSyncUp(), args);
|
||||||
|
System.exit(ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int run(String[] args) throws Exception {
|
||||||
|
Replication replication;
|
||||||
|
ReplicationSourceManager manager;
|
||||||
|
FileSystem fs;
|
||||||
|
Path oldLogDir, logDir, rootDir;
|
||||||
|
ZooKeeperWatcher zkw;
|
||||||
|
|
||||||
|
Abortable abortable = new Abortable() {
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
zkw =
|
||||||
|
new ZooKeeperWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable,
|
||||||
|
true);
|
||||||
|
|
||||||
|
rootDir = FSUtils.getRootDir(conf);
|
||||||
|
fs = FileSystem.get(conf);
|
||||||
|
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
|
||||||
|
|
||||||
|
System.out.println("Start Replication Server start");
|
||||||
|
replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
|
||||||
|
manager = replication.getReplicationManager();
|
||||||
|
manager.init();
|
||||||
|
|
||||||
|
try {
|
||||||
|
int numberOfOldSource = 1; // default wait once
|
||||||
|
while (numberOfOldSource > 0) {
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
numberOfOldSource = manager.getOldSources().size();
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
System.err.println("didn't wait long enough:" + e);
|
||||||
|
return (-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
manager.join();
|
||||||
|
|
||||||
|
return (0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static class DummyServer implements Server {
|
||||||
|
String hostname;
|
||||||
|
ZooKeeperWatcher zkw;
|
||||||
|
|
||||||
|
DummyServer(ZooKeeperWatcher zkw) {
|
||||||
|
// an unique name in case the first run fails
|
||||||
|
hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org";
|
||||||
|
this.zkw = zkw;
|
||||||
|
}
|
||||||
|
|
||||||
|
DummyServer(String hostname) {
|
||||||
|
this.hostname = hostname;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZooKeeperWatcher getZooKeeper() {
|
||||||
|
return zkw;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CatalogTracker getCatalogTracker() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return ServerName.valueOf(hostname, 1234, 1L);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop(String why) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isStopped() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,361 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.*;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(LargeTests.class)
|
||||||
|
public class TestReplicationSyncUpTool extends TestReplicationBase {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestReplicationSyncUpTool.class);
|
||||||
|
|
||||||
|
private static final byte[] t1_su = Bytes.toBytes("t1_syncup");
|
||||||
|
private static final byte[] t2_su = Bytes.toBytes("t2_syncup");
|
||||||
|
|
||||||
|
private static final byte[] famName = Bytes.toBytes("cf1");
|
||||||
|
private static final byte[] qualName = Bytes.toBytes("q1");
|
||||||
|
|
||||||
|
private static final byte[] noRepfamName = Bytes.toBytes("norep");
|
||||||
|
|
||||||
|
private HTableDescriptor t1_syncupSource, t1_syncupTarget;
|
||||||
|
private HTableDescriptor t2_syncupSource, t2_syncupTarget;
|
||||||
|
|
||||||
|
private HTable ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
|
||||||
|
private int rowCount_ht1Source, rowCount_ht2Source, rowCount_ht1TargetAtPeer1,
|
||||||
|
rowCount_ht2TargetAtPeer1;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
|
||||||
|
HColumnDescriptor fam;
|
||||||
|
|
||||||
|
t1_syncupSource = new HTableDescriptor(TableName.valueOf(t1_su));
|
||||||
|
fam = new HColumnDescriptor(famName);
|
||||||
|
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||||
|
t1_syncupSource.addFamily(fam);
|
||||||
|
fam = new HColumnDescriptor(noRepfamName);
|
||||||
|
t1_syncupSource.addFamily(fam);
|
||||||
|
|
||||||
|
t1_syncupTarget = new HTableDescriptor(TableName.valueOf(t1_su));
|
||||||
|
fam = new HColumnDescriptor(famName);
|
||||||
|
t1_syncupTarget.addFamily(fam);
|
||||||
|
fam = new HColumnDescriptor(noRepfamName);
|
||||||
|
t1_syncupTarget.addFamily(fam);
|
||||||
|
|
||||||
|
t2_syncupSource = new HTableDescriptor(TableName.valueOf(t2_su));
|
||||||
|
fam = new HColumnDescriptor(famName);
|
||||||
|
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||||
|
t2_syncupSource.addFamily(fam);
|
||||||
|
fam = new HColumnDescriptor(noRepfamName);
|
||||||
|
t2_syncupSource.addFamily(fam);
|
||||||
|
|
||||||
|
t2_syncupTarget = new HTableDescriptor(TableName.valueOf(t2_su));
|
||||||
|
fam = new HColumnDescriptor(famName);
|
||||||
|
t2_syncupTarget.addFamily(fam);
|
||||||
|
fam = new HColumnDescriptor(noRepfamName);
|
||||||
|
t2_syncupTarget.addFamily(fam);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a row to a table in each cluster, check it's replicated, delete it,
|
||||||
|
* check's gone Also check the puts and deletes are not replicated back to
|
||||||
|
* the originating cluster.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testSyncUpTool() throws Exception {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up Replication: on Master and one Slave
|
||||||
|
* Table: t1_syncup and t2_syncup
|
||||||
|
* columnfamily:
|
||||||
|
* 'cf1' : replicated
|
||||||
|
* 'norep': not replicated
|
||||||
|
*/
|
||||||
|
setupReplication();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* at Master:
|
||||||
|
* t1_syncup: put 100 rows into cf1, and 1 rows into norep
|
||||||
|
* t2_syncup: put 200 rows into cf1, and 1 rows into norep
|
||||||
|
*
|
||||||
|
* verify correctly replicated to slave
|
||||||
|
*/
|
||||||
|
putAndReplicateRows();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify delete works
|
||||||
|
*
|
||||||
|
* step 1: stop hbase on Slave
|
||||||
|
*
|
||||||
|
* step 2: at Master:
|
||||||
|
* t1_syncup: delete 50 rows from cf1
|
||||||
|
* t2_syncup: delete 100 rows from cf1
|
||||||
|
* no change on 'norep'
|
||||||
|
*
|
||||||
|
* step 3: stop hbase on master, restart hbase on Slave
|
||||||
|
*
|
||||||
|
* step 4: verify Slave still have the rows before delete
|
||||||
|
* t1_syncup: 100 rows from cf1
|
||||||
|
* t2_syncup: 200 rows from cf1
|
||||||
|
*
|
||||||
|
* step 5: run syncup tool on Master
|
||||||
|
*
|
||||||
|
* step 6: verify that delete show up on Slave
|
||||||
|
* t1_syncup: 50 rows from cf1
|
||||||
|
* t2_syncup: 100 rows from cf1
|
||||||
|
*
|
||||||
|
* verify correctly replicated to Slave
|
||||||
|
*/
|
||||||
|
mimicSyncUpAfterDelete();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify put works
|
||||||
|
*
|
||||||
|
* step 1: stop hbase on Slave
|
||||||
|
*
|
||||||
|
* step 2: at Master:
|
||||||
|
* t1_syncup: put 100 rows from cf1
|
||||||
|
* t2_syncup: put 200 rows from cf1
|
||||||
|
* and put another row on 'norep'
|
||||||
|
* ATTN: put to 'cf1' will overwrite existing rows, so end count will
|
||||||
|
* be 100 and 200 respectively
|
||||||
|
* put to 'norep' will add a new row.
|
||||||
|
*
|
||||||
|
* step 3: stop hbase on master, restart hbase on Slave
|
||||||
|
*
|
||||||
|
* step 4: verify Slave still has the rows before put
|
||||||
|
* t1_syncup: 50 rows from cf1
|
||||||
|
* t2_syncup: 100 rows from cf1
|
||||||
|
*
|
||||||
|
* step 5: run syncup tool on Master
|
||||||
|
*
|
||||||
|
* step 6: verify that put show up on Slave
|
||||||
|
* and 'norep' does not
|
||||||
|
* t1_syncup: 100 rows from cf1
|
||||||
|
* t2_syncup: 200 rows from cf1
|
||||||
|
*
|
||||||
|
* verify correctly replicated to Slave
|
||||||
|
*/
|
||||||
|
mimicSyncUpAfterPut();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setupReplication() throws Exception {
|
||||||
|
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
|
||||||
|
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
|
||||||
|
|
||||||
|
HBaseAdmin ha = new HBaseAdmin(conf1);
|
||||||
|
ha.createTable(t1_syncupSource);
|
||||||
|
ha.createTable(t2_syncupSource);
|
||||||
|
ha.close();
|
||||||
|
|
||||||
|
ha = new HBaseAdmin(conf2);
|
||||||
|
ha.createTable(t1_syncupTarget);
|
||||||
|
ha.createTable(t2_syncupTarget);
|
||||||
|
ha.close();
|
||||||
|
|
||||||
|
// Get HTable from Master
|
||||||
|
ht1Source = new HTable(conf1, t1_su);
|
||||||
|
ht1Source.setWriteBufferSize(1024);
|
||||||
|
ht2Source = new HTable(conf1, t2_su);
|
||||||
|
ht1Source.setWriteBufferSize(1024);
|
||||||
|
|
||||||
|
// Get HTable from Peer1
|
||||||
|
ht1TargetAtPeer1 = new HTable(conf2, t1_su);
|
||||||
|
ht1TargetAtPeer1.setWriteBufferSize(1024);
|
||||||
|
ht2TargetAtPeer1 = new HTable(conf2, t2_su);
|
||||||
|
ht2TargetAtPeer1.setWriteBufferSize(1024);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* set M-S : Master: utility1 Slave1: utility2
|
||||||
|
*/
|
||||||
|
admin1.addPeer("1", utility2.getClusterKey());
|
||||||
|
|
||||||
|
admin1.close();
|
||||||
|
admin2.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putAndReplicateRows() throws Exception {
|
||||||
|
// add rows to Master cluster,
|
||||||
|
Put p;
|
||||||
|
|
||||||
|
// 100 + 1 row to t1_syncup
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
|
||||||
|
p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.add(famName, qualName, Bytes.toBytes("val" + i));
|
||||||
|
ht1Source.put(p);
|
||||||
|
}
|
||||||
|
p = new Put(Bytes.toBytes("row" + 9999));
|
||||||
|
p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
|
||||||
|
ht1Source.put(p);
|
||||||
|
|
||||||
|
// 200 + 1 row to t2_syncup
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
|
||||||
|
p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.add(famName, qualName, Bytes.toBytes("val" + i));
|
||||||
|
ht2Source.put(p);
|
||||||
|
}
|
||||||
|
p = new Put(Bytes.toBytes("row" + 9999));
|
||||||
|
p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
|
||||||
|
ht2Source.put(p);
|
||||||
|
|
||||||
|
// ensure replication completed
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
|
||||||
|
rowCount_ht1Source = utility1.countRows(ht1Source);
|
||||||
|
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
|
||||||
|
assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
|
||||||
|
rowCount_ht1TargetAtPeer1);
|
||||||
|
|
||||||
|
rowCount_ht2Source = utility1.countRows(ht2Source);
|
||||||
|
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
|
||||||
|
assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
|
||||||
|
rowCount_ht2TargetAtPeer1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mimicSyncUpAfterDelete() throws Exception {
|
||||||
|
utility2.shutdownMiniHBaseCluster();
|
||||||
|
|
||||||
|
List<Delete> list = new ArrayList<Delete>();
|
||||||
|
// delete half of the rows
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH / 2; i++) {
|
||||||
|
String rowKey = "row" + i;
|
||||||
|
Delete del = new Delete(rowKey.getBytes());
|
||||||
|
list.add(del);
|
||||||
|
}
|
||||||
|
ht1Source.delete(list);
|
||||||
|
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
|
||||||
|
String rowKey = "row" + i;
|
||||||
|
Delete del = new Delete(rowKey.getBytes());
|
||||||
|
list.add(del);
|
||||||
|
}
|
||||||
|
ht2Source.delete(list);
|
||||||
|
|
||||||
|
rowCount_ht1Source = utility1.countRows(ht1Source);
|
||||||
|
assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
|
||||||
|
rowCount_ht1Source);
|
||||||
|
|
||||||
|
rowCount_ht2Source = utility1.countRows(ht2Source);
|
||||||
|
assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
|
||||||
|
101, rowCount_ht2Source);
|
||||||
|
|
||||||
|
utility1.shutdownMiniHBaseCluster();
|
||||||
|
utility2.restartHBaseCluster(1);
|
||||||
|
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
|
||||||
|
// before sync up
|
||||||
|
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
|
||||||
|
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
|
||||||
|
|
||||||
|
// After sync up
|
||||||
|
syncUp(utility1);
|
||||||
|
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
|
||||||
|
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
|
||||||
|
rowCount_ht1TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
|
||||||
|
rowCount_ht2TargetAtPeer1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mimicSyncUpAfterPut() throws Exception {
|
||||||
|
utility1.restartHBaseCluster(1);
|
||||||
|
utility2.shutdownMiniHBaseCluster();
|
||||||
|
|
||||||
|
Put p;
|
||||||
|
// another 100 + 1 row to t1_syncup
|
||||||
|
// we should see 100 + 2 rows now
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
|
||||||
|
p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.add(famName, qualName, Bytes.toBytes("val" + i));
|
||||||
|
ht1Source.put(p);
|
||||||
|
}
|
||||||
|
p = new Put(Bytes.toBytes("row" + 9998));
|
||||||
|
p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
|
||||||
|
ht1Source.put(p);
|
||||||
|
|
||||||
|
// another 200 + 1 row to t1_syncup
|
||||||
|
// we should see 200 + 2 rows now
|
||||||
|
for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
|
||||||
|
p = new Put(Bytes.toBytes("row" + i));
|
||||||
|
p.add(famName, qualName, Bytes.toBytes("val" + i));
|
||||||
|
ht2Source.put(p);
|
||||||
|
}
|
||||||
|
p = new Put(Bytes.toBytes("row" + 9998));
|
||||||
|
p.add(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
|
||||||
|
ht2Source.put(p);
|
||||||
|
|
||||||
|
rowCount_ht1Source = utility1.countRows(ht1Source);
|
||||||
|
assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
|
||||||
|
rowCount_ht2Source = utility1.countRows(ht2Source);
|
||||||
|
assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
|
||||||
|
|
||||||
|
utility1.shutdownMiniHBaseCluster();
|
||||||
|
utility2.restartHBaseCluster(1);
|
||||||
|
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
|
||||||
|
// before sync up
|
||||||
|
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
|
||||||
|
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
|
||||||
|
rowCount_ht1TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
|
||||||
|
rowCount_ht2TargetAtPeer1);
|
||||||
|
|
||||||
|
// after syun up
|
||||||
|
syncUp(utility1);
|
||||||
|
rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
|
||||||
|
rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
|
||||||
|
rowCount_ht1TargetAtPeer1);
|
||||||
|
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
|
||||||
|
rowCount_ht2TargetAtPeer1);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncUp(HBaseTestingUtility ut) throws Exception {
|
||||||
|
ReplicationSyncUp.setConfigure(ut.getConfiguration());
|
||||||
|
String[] arguments = new String[] { null };
|
||||||
|
new ReplicationSyncUp().run(arguments);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue