HBASE-20083 Fix findbugs error for ReplicationSyncUp

This commit is contained in:
zhangduo 2018-02-26 16:37:58 +08:00
parent 309f3360bf
commit ef02762dd8
2 changed files with 26 additions and 57 deletions

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -37,51 +36,31 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* In a scenario of Replication based Disaster/Recovery, when hbase * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
* Master-Cluster crashes, this tool is used to sync-up the delta from Master to * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
* Slave using the info from ZooKeeper. The tool will run on Master-Cluser, and * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase
* assume ZK, Filesystem and NetWork still available after hbase crashes * crashes
* *
* <pre>
* hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
* </pre>
*/ */
public class ReplicationSyncUp extends Configured implements Tool { public class ReplicationSyncUp extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSyncUp.class.getName());
private static Configuration conf;
private static final long SLEEP_TIME = 10000; private static final long SLEEP_TIME = 10000;
// although the tool is designed to be run on command line
// this api is provided for executing the tool through another app
public static void setConfigure(Configuration config) {
conf = config;
}
/** /**
* Main program * Main program
* @param args
* @throws Exception
*/ */
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (conf == null) conf = HBaseConfiguration.create(); int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args);
int ret = ToolRunner.run(conf, new ReplicationSyncUp(), args);
System.exit(ret); System.exit(ret);
} }
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
Replication replication;
ReplicationSourceManager manager;
FileSystem fs;
Path oldLogDir, logDir, walRootDir;
ZKWatcher zkw;
Abortable abortable = new Abortable() { Abortable abortable = new Abortable() {
@Override @Override
public void abort(String why, Throwable e) { public void abort(String why, Throwable e) {
@ -92,23 +71,19 @@ public class ReplicationSyncUp extends Configured implements Tool {
return false; return false;
} }
}; };
Configuration conf = getConf();
try (ZKWatcher zkw =
new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, true)) {
Path walRootDir = FSUtils.getWALRootDir(conf);
FileSystem fs = FSUtils.getWALFileSystem(conf);
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
zkw = System.out.println("Start Replication Server start");
new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, Replication replication = new Replication();
true); replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
ReplicationSourceManager manager = replication.getReplicationManager();
walRootDir = FSUtils.getWALRootDir(conf); manager.init();
fs = FSUtils.getWALFileSystem(conf);
oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
System.out.println("Start Replication Server start");
replication = new Replication();
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
manager = replication.getReplicationManager();
manager.init();
try {
int numberOfOldSource = 1; // default wait once int numberOfOldSource = 1; // default wait once
while (numberOfOldSource > 0) { while (numberOfOldSource > 0) {
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
@ -117,15 +92,12 @@ public class ReplicationSyncUp extends Configured implements Tool {
manager.join(); manager.join();
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.err.println("didn't wait long enough:" + e); System.err.println("didn't wait long enough:" + e);
return (-1); return -1;
} finally {
zkw.close();
} }
return 0;
return (0);
} }
static class DummyServer implements Server { class DummyServer implements Server {
String hostname; String hostname;
ZKWatcher zkw; ZKWatcher zkw;
@ -141,7 +113,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
@Override @Override
public Configuration getConfiguration() { public Configuration getConfiguration() {
return conf; return getConf();
} }
@Override @Override
@ -194,7 +166,6 @@ public class ReplicationSyncUp extends Configured implements Tool {
@Override @Override
public ClusterConnection getClusterConnection() { public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null; return null;
} }

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
@ -422,9 +423,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
} }
protected void syncUp(HBaseTestingUtility ut) throws Exception { protected void syncUp(HBaseTestingUtility ut) throws Exception {
ReplicationSyncUp.setConfigure(ut.getConfiguration()); ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]);
String[] arguments = new String[] { null };
new ReplicationSyncUp().run(arguments);
} }
} }