HBASE-3351 ReplicationZookeeper goes to ZK every time a znode is modified
HBASE-3326 Replication state's znode should be created else it defaults to false git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1048932 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1b7b7a935a
commit
6c2dd8e749
|
@ -768,6 +768,9 @@ Release 0.90.0 - Unreleased
|
|||
HBASE-3321 Replication.join shouldn't clear the logs znode
|
||||
HBASE-3352 enabling a non-existent table from shell prints no error
|
||||
HBASE-3353 table.jsp doesn't handle entries in META without server info
|
||||
HBASE-3351 ReplicationZookeeper goes to ZK every time a znode is modified
|
||||
HBASE-3326 Replication state's znode should be created else it
|
||||
defaults to false
|
||||
|
||||
|
||||
IMPROVEMENTS
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import org.apache.commons.lang.NotImplementedException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -65,6 +66,7 @@ import org.apache.zookeeper.KeeperException;
|
|||
public class ReplicationAdmin {
|
||||
|
||||
private final ReplicationZookeeper replicationZk;
|
||||
private final HConnection connection;
|
||||
|
||||
/**
|
||||
* Constructor that creates a connection to the local ZooKeeper ensemble.
|
||||
|
@ -77,10 +79,10 @@ public class ReplicationAdmin {
|
|||
throw new RuntimeException("hbase.replication isn't true, please " +
|
||||
"enable it in order to use replication");
|
||||
}
|
||||
ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf).
|
||||
getZooKeeperWatcher();
|
||||
this.connection = HConnectionManager.getConnection(conf);
|
||||
ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
|
||||
try {
|
||||
this.replicationZk = new ReplicationZookeeper(conf, zkw);
|
||||
this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Unable setup the ZooKeeper connection", e);
|
||||
}
|
||||
|
@ -150,8 +152,13 @@ public class ReplicationAdmin {
|
|||
* @return the previous state
|
||||
*/
|
||||
public boolean setReplicating(boolean newState) throws IOException {
|
||||
boolean prev = getReplicating();
|
||||
this.replicationZk.setReplicating(newState);
|
||||
boolean prev = true;
|
||||
try {
|
||||
prev = getReplicating();
|
||||
this.replicationZk.setReplicating(newState);
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Unable to set the replication state", e);
|
||||
}
|
||||
return prev;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,12 +27,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||
|
@ -106,9 +108,9 @@ public class VerifyReplication {
|
|||
endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
|
||||
}
|
||||
try {
|
||||
ReplicationZookeeper zk = new ReplicationZookeeper(conf,
|
||||
HConnectionManager.getConnection(conf).
|
||||
getZooKeeperWatcher());
|
||||
HConnection conn = HConnectionManager.getConnection(conf);
|
||||
ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
|
||||
conn.getZooKeeperWatcher());
|
||||
ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
|
||||
HTable replicatedTable = new HTable(peer.getConfiguration(),
|
||||
conf.get(NAME+".tableName"));
|
||||
|
@ -150,8 +152,9 @@ public class VerifyReplication {
|
|||
throw new IOException("Replication needs to be enabled to verify it.");
|
||||
}
|
||||
try {
|
||||
ReplicationZookeeper zk = new ReplicationZookeeper(conf,
|
||||
HConnectionManager.getConnection(conf).getZooKeeperWatcher());
|
||||
HConnection conn = HConnectionManager.getConnection(conf);
|
||||
ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
|
||||
conn.getZooKeeperWatcher());
|
||||
// Just verifying it we can connect
|
||||
ReplicationPeer peer = zk.getPeer(peerId);
|
||||
if (peer == null) {
|
||||
|
|
|
@ -130,6 +130,11 @@ public class LogCleaner extends Chore {
|
|||
Path filePath = file.getPath();
|
||||
if (HLog.validateHLogFilename(filePath.getName())) {
|
||||
for (LogCleanerDelegate logCleaner : logCleanersChain) {
|
||||
if (logCleaner.isStopped()) {
|
||||
LOG.warn("A log cleaner is stopped, won't delete any log.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!logCleaner.isLogDeletable(filePath) ) {
|
||||
// this log is not deletable, continue to process next log file
|
||||
continue FILE;
|
||||
|
|
|
@ -100,6 +100,7 @@ public class ReplicationZookeeper {
|
|||
private String ourClusterKey;
|
||||
// Abortable
|
||||
private Abortable abortable;
|
||||
private ReplicationStatusTracker statusTracker;
|
||||
|
||||
/**
|
||||
* Constructor used by clients of replication (like master and HBase clients)
|
||||
|
@ -107,12 +108,14 @@ public class ReplicationZookeeper {
|
|||
* @param zk zk connection to use
|
||||
* @throws IOException
|
||||
*/
|
||||
public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
|
||||
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
|
||||
final ZooKeeperWatcher zk)
|
||||
throws KeeperException {
|
||||
|
||||
this.conf = conf;
|
||||
this.zookeeper = zk;
|
||||
setZNodes();
|
||||
this.replicating = new AtomicBoolean();
|
||||
setZNodes(abortable);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -128,23 +131,18 @@ public class ReplicationZookeeper {
|
|||
this.abortable = server;
|
||||
this.zookeeper = server.getZooKeeper();
|
||||
this.conf = server.getConfiguration();
|
||||
setZNodes();
|
||||
this.replicating = replicating;
|
||||
setZNodes(server);
|
||||
|
||||
this.peerClusters = new HashMap<String, ReplicationPeer>();
|
||||
this.replicating = replicating;
|
||||
ZKUtil.createWithParents(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
|
||||
readReplicationStateZnode();
|
||||
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
|
||||
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
|
||||
// Set a tracker on replicationStateNodeNode
|
||||
ReplicationStatusTracker tracker =
|
||||
new ReplicationStatusTracker(this.zookeeper, server);
|
||||
tracker.start();
|
||||
connectExistingPeers();
|
||||
}
|
||||
|
||||
private void setZNodes() throws KeeperException {
|
||||
private void setZNodes(Abortable abortable) throws KeeperException {
|
||||
String replicationZNodeName =
|
||||
conf.get("zookeeper.znode.replication", "replication");
|
||||
String peersZNodeName =
|
||||
|
@ -170,6 +168,11 @@ public class ReplicationZookeeper {
|
|||
String idResult = Bytes.toString(data);
|
||||
this.clusterId = idResult == null?
|
||||
Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
|
||||
// Set a tracker on replicationStateNodeNode
|
||||
this.statusTracker =
|
||||
new ReplicationStatusTracker(this.zookeeper, abortable);
|
||||
statusTracker.start();
|
||||
readReplicationStateZnode();
|
||||
}
|
||||
|
||||
private void connectExistingPeers() throws IOException, KeeperException {
|
||||
|
@ -292,16 +295,12 @@ public class ReplicationZookeeper {
|
|||
* Set the new replication state for this cluster
|
||||
* @param newState
|
||||
*/
|
||||
public void setReplicating(boolean newState) throws IOException {
|
||||
try {
|
||||
ZKUtil.createWithParents(this.zookeeper,
|
||||
public void setReplicating(boolean newState) throws KeeperException {
|
||||
ZKUtil.createWithParents(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
|
||||
ZKUtil.setData(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
|
||||
Bytes.toBytes(Boolean.toString(newState)));
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Unable to set the replication state", e);
|
||||
}
|
||||
ZKUtil.setData(this.zookeeper,
|
||||
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
|
||||
Bytes.toBytes(Boolean.toString(newState)));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -368,8 +367,18 @@ public class ReplicationZookeeper {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replication status of this cluster. If the state znode doesn't
|
||||
* exist it will also create it and set it true.
|
||||
* @return returns true when it's enabled, else false
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public boolean getReplication() throws KeeperException {
|
||||
byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
|
||||
byte [] data = this.statusTracker.getData();
|
||||
if (data == null || data.length == 0) {
|
||||
setReplicating(true);
|
||||
return true;
|
||||
}
|
||||
return Boolean.parseBoolean(Bytes.toString(data));
|
||||
}
|
||||
|
||||
|
@ -681,8 +690,10 @@ public class ReplicationZookeeper {
|
|||
|
||||
@Override
|
||||
public synchronized void nodeDataChanged(String path) {
|
||||
super.nodeDataChanged(path);
|
||||
readReplicationStateZnode();
|
||||
if (path.equals(node)) {
|
||||
super.nodeDataChanged(path);
|
||||
readReplicationStateZnode();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
|
||||
|
@ -39,7 +40,7 @@ import java.util.Set;
|
|||
* Implementation of a log cleaner that checks if a log is still scheduled for
|
||||
* replication before deleting it when its TTL is over.
|
||||
*/
|
||||
public class ReplicationLogCleaner implements LogCleanerDelegate {
|
||||
public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
|
||||
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
|
||||
private Configuration conf;
|
||||
private ReplicationZookeeper zkHelper;
|
||||
|
@ -53,6 +54,16 @@ public class ReplicationLogCleaner implements LogCleanerDelegate {
|
|||
|
||||
@Override
|
||||
public boolean isLogDeletable(Path filePath) {
|
||||
|
||||
try {
|
||||
if (!zkHelper.getReplication()) {
|
||||
return false;
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
abort("Cannot get the state of replication", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
// all members of this class are null if replication is disabled, and we
|
||||
// return true since false would render the LogsCleaner useless
|
||||
if (this.conf == null) {
|
||||
|
@ -121,7 +132,7 @@ public class ReplicationLogCleaner implements LogCleanerDelegate {
|
|||
try {
|
||||
ZooKeeperWatcher zkw =
|
||||
new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null);
|
||||
this.zkHelper = new ReplicationZookeeper(this.conf, zkw);
|
||||
this.zkHelper = new ReplicationZookeeper(this, this.conf, zkw);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Error while configuring " + this.getClass().getName(), e);
|
||||
} catch (IOException e) {
|
||||
|
@ -150,4 +161,10 @@ public class ReplicationLogCleaner implements LogCleanerDelegate {
|
|||
public boolean isStopped() {
|
||||
return this.stopped;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
|
||||
stop(why);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue