HBASE-2196 Support more than one slave cluster (Lars Hofhansl)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1170950 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-15 04:16:53 +00:00
parent cb3db2f74e
commit 692207c368
10 changed files with 401 additions and 60 deletions

View File

@ -507,6 +507,7 @@ Release 0.91.0 - Unreleased
(Li Pi)
HBASE-4296 Deprecate HTable[Interface].getRowOrBefore(...) (Lars Hofhansl)
HBASE-2195 Support cyclic replication (Lars Hofhansl)
HBASE-2196 Support more than one slave cluster (Lars Hofhansl)
NEW FEATURES
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client.replication;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
@ -133,6 +134,14 @@ public class ReplicationAdmin implements Closeable {
return this.replicationZk.listPeersIdsAndWatch().size();
}
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
*/
public Map<String, String> listPeers() {
return this.replicationZk.listPeers();
}
/**
* Get the current status of the kill switch, if the cluster is replicating
* or not.

View File

@ -186,6 +186,24 @@ public class ReplicationZookeeper {
return ids;
}
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
*/
public Map<String,String> listPeers() {
Map<String,String> peers = new TreeMap<String,String>();
List<String> ids = null;
try {
ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
for (String id : ids) {
peers.put(id, Bytes.toString(ZKUtil.getData(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id))));
}
} catch (KeeperException e) {
this.abortable.abort("Cannot get the list of peers ", e);
}
return peers;
}
/**
* Returns all region servers from given peer
*
@ -264,10 +282,6 @@ public class ReplicationZookeeper {
}
if (this.peerClusters.containsKey(peerId)) {
return false;
// TODO remove when we support it
} else if (this.peerClusters.size() > 0) {
LOG.warn("Multiple slaves feature not supported");
return false;
}
ReplicationPeer peer = getPeer(peerId);
if (peer == null) {
@ -351,8 +365,6 @@ public class ReplicationZookeeper {
try {
if (peerExists(id)) {
throw new IllegalArgumentException("Cannot add existing peer");
} else if (countPeers() > 0) {
throw new IllegalStateException("Multi-slave isn't supported yet");
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper,
@ -367,12 +379,6 @@ public class ReplicationZookeeper {
ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
}
private int countPeers() throws KeeperException {
List<String> peers =
ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
return peers == null ? 0 : peers.size();
}
/**
* This reads the state znode for replication and sets the atomic boolean
*/
@ -408,11 +414,11 @@ public class ReplicationZookeeper {
/**
* Add a new log to the list of hlogs in zookeeper
* @param filename name of the hlog's znode
* @param clusterId name of the cluster's znode
* @param peerId name of the cluster's znode
*/
public void addLogToList(String filename, String clusterId) {
public void addLogToList(String filename, String peerId) {
try {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, clusterId);
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.createWithParents(this.zookeeper, znode);
} catch (KeeperException e) {

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
@ -96,7 +95,7 @@ public class ReplicationSource extends Thread
// Should we stop everything?
private Stoppable stopper;
// List of chosen sinks (region servers)
private List<HServerAddress> currentPeers;
private List<ServerName> currentPeers;
// How long should we sleep for each retry
private long sleepForRetries;
// Max size in bytes of entriesArray
@ -173,7 +172,7 @@ public class ReplicationSource extends Thread
this.conn = HConnectionManager.getConnection(conf);
this.zkHelper = manager.getRepZkWrapper();
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
this.currentPeers = new ArrayList<HServerAddress>();
this.currentPeers = new ArrayList<ServerName>();
this.random = new Random();
this.replicating = replicating;
this.manager = manager;
@ -215,19 +214,18 @@ public class ReplicationSource extends Thread
this.currentPeers.clear();
List<ServerName> addresses =
this.zkHelper.getSlavesAddresses(peerId);
Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
Set<ServerName> setOfAddr = new HashSet<ServerName>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
" rs from peer cluster # " + peerId);
for (int i = 0; i < nbPeers; i++) {
HServerAddress address;
ServerName sn;
// Make sure we get one address that we don't already have
do {
ServerName sn = addresses.get(this.random.nextInt(addresses.size()));
address = new HServerAddress(sn.getHostname(), sn.getPort());
} while (setOfAddr.contains(address));
LOG.info("Choosing peer " + address);
setOfAddr.add(address);
sn = addresses.get(this.random.nextInt(addresses.size()));
} while (setOfAddr.contains(sn));
LOG.info("Choosing peer " + sn);
setOfAddr.add(sn);
}
this.currentPeers.addAll(setOfAddr);
}
@ -694,9 +692,9 @@ public class ReplicationSource extends Thread
if (this.currentPeers.size() == 0) {
throw new IOException(this.peerClusterZnode + " has 0 region servers");
}
HServerAddress address =
ServerName address =
currentPeers.get(random.nextInt(this.currentPeers.size()));
return this.conn.getHRegionConnection(address);
return this.conn.getHRegionConnection(address.getHostname(), address.getPort());
}
/**

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
@ -70,7 +72,7 @@ public class ReplicationSourceManager {
// All about stopping
private final Stoppable stopper;
// All logs we are currently trackign
private final SortedSet<String> hlogs;
private final Map<String, SortedSet<String>> hlogsById;
private final Configuration conf;
private final FileSystem fs;
// The path to the latest log we saw, for new coming sources
@ -108,7 +110,7 @@ public class ReplicationSourceManager {
this.replicating = replicating;
this.zkHelper = zkHelper;
this.stopper = stopper;
this.hlogs = new TreeSet<String>();
this.hlogsById = new HashMap<String, SortedSet<String>>();
this.oldsources = new ArrayList<ReplicationSourceInterface>();
this.conf = conf;
this.fs = fs;
@ -149,14 +151,15 @@ public class ReplicationSourceManager {
public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered) {
String key = log.getName();
LOG.info("Going to report log #" + key + " for position " + position + " in " + log);
this.zkHelper.writeReplicationStatus(key.toString(), id, position);
synchronized (this.hlogs) {
if (!queueRecovered && this.hlogs.first() != key) {
SortedSet<String> hlogSet = this.hlogs.headSet(key);
this.zkHelper.writeReplicationStatus(key, id, position);
synchronized (this.hlogsById) {
SortedSet<String> hlogs = this.hlogsById.get(id);
if (!queueRecovered && hlogs.first() != key) {
SortedSet<String> hlogSet = hlogs.headSet(key);
LOG.info("Removing " + hlogSet.size() +
" logs in the list: " + hlogSet);
for (String hlog : hlogSet) {
this.zkHelper.removeLogFromList(hlog.toString(), id);
this.zkHelper.removeLogFromList(hlog, id);
}
hlogSet.clear();
}
@ -200,12 +203,14 @@ public class ReplicationSourceManager {
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
// TODO set it to what's in ZK
src.setSourceEnabled(true);
synchronized (this.hlogs) {
synchronized (this.hlogsById) {
this.sources.add(src);
if (this.hlogs.size() > 0) {
// Add the latest hlog to that source's queue
this.zkHelper.addLogToList(this.hlogs.last(),
this.sources.get(0).getPeerClusterZnode());
this.hlogsById.put(id, new TreeSet<String>());
// Add the latest hlog to that source's queue
if (this.latestPath != null) {
String name = this.latestPath.getName();
this.hlogsById.get(id).add(name);
this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
src.enqueueLog(this.latestPath);
}
}
@ -230,8 +235,8 @@ public class ReplicationSourceManager {
* Get a copy of the hlogs of the first source on this rs
* @return a sorted set of hlog names
*/
protected SortedSet<String> getHLogs() {
return new TreeSet<String>(this.hlogs);
protected Map<String, SortedSet<String>> getHLogs() {
return Collections.unmodifiableMap(hlogsById);
}
/**
@ -248,21 +253,25 @@ public class ReplicationSourceManager {
return;
}
synchronized (this.hlogs) {
if (this.sources.size() > 0) {
this.zkHelper.addLogToList(newLog.getName(),
this.sources.get(0).getPeerClusterZnode());
} else {
// If there's no slaves, don't need to keep the old hlogs since
// we only consider the last one when a new slave comes in
this.hlogs.clear();
synchronized (this.hlogsById) {
String name = newLog.getName();
for (ReplicationSourceInterface source : this.sources) {
this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
}
for (SortedSet<String> hlogs : this.hlogsById.values()) {
if (this.sources.isEmpty()) {
// If there's no slaves, don't need to keep the old hlogs since
// we only consider the last one when a new slave comes in
hlogs.clear();
}
hlogs.add(name);
}
this.hlogs.add(newLog.getName());
}
this.latestPath = newLog;
// This only update the sources we own, not the recovered ones
// This only updates the sources we own, not the recovered ones
for (ReplicationSourceInterface source : this.sources) {
source.enqueueLog(newLog);
source.enqueueLog(newLog);
}
}
@ -281,7 +290,7 @@ public class ReplicationSourceManager {
* @param manager the manager to use
* @param stopper the stopper object for this region server
* @param replicating the status of the replication on this cluster
* @param peerClusterId the id of the peer cluster
* @param peerId the id of the peer cluster
* @return the created source
* @throws IOException
*/
@ -291,7 +300,7 @@ public class ReplicationSourceManager {
final ReplicationSourceManager manager,
final Stoppable stopper,
final AtomicBoolean replicating,
final String peerClusterId) throws IOException {
final String peerId) throws IOException {
ReplicationSourceInterface src;
try {
@SuppressWarnings("rawtypes")
@ -299,12 +308,12 @@ public class ReplicationSourceManager {
ReplicationSource.class.getCanonicalName()));
src = (ReplicationSourceInterface) c.newInstance();
} catch (Exception e) {
LOG.warn("Passed replication source implemention throws errors, " +
LOG.warn("Passed replication source implementation throws errors, " +
"defaulting to ReplicationSource", e);
src = new ReplicationSource();
}
src.init(conf, fs, manager, stopper, replicating, peerClusterId);
src.init(conf, fs, manager, stopper, replicating, peerId);
return src;
}
@ -410,7 +419,7 @@ public class ReplicationSourceManager {
return;
}
LOG.info(path + " znode expired, trying to lock it");
transferQueues(zkHelper.getZNodeName(path));
transferQueues(ReplicationZookeeper.getZNodeName(path));
}
/**
@ -462,7 +471,7 @@ public class ReplicationSourceManager {
if (peers == null) {
return;
}
String id = zkHelper.getZNodeName(path);
String id = ReplicationZookeeper.getZNodeName(path);
removePeer(id);
}

View File

@ -43,6 +43,12 @@ module Hbase
@replication_admin.removePeer(id)
end
#----------------------------------------------------------------------------------------------
# List all peer clusters
def list_peers
@replication_admin.listPeers
end
#----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer
def enable_peer(id)

View File

@ -276,6 +276,7 @@ Shell.load_command_group(
:commands => %w[
add_peer
remove_peer
list_peers
enable_peer
disable_peer
start_replication

View File

@ -0,0 +1,46 @@
#
# Copyright The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class ListPeers< Command
def help
return <<-EOF
List all replication peer clusters.
hbase> list_peers
EOF
end
def command()
now = Time.now
peers = replication_admin.list_peers
formatter.header(["PEER ID", "CLUSTER KEY"])
peers.entrySet().each do |e|
formatter.row([ e.key, e.value ])
end
formatter.footer(now)
end
end
end
end

View File

@ -0,0 +1,263 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestMultiSlaveReplication {
private static final Log LOG = LogFactory.getLog(TestReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static Configuration conf3;
private static String clusterKey2;
private static String clusterKey3;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static HBaseTestingUtility utility3;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 10;
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] row1 = Bytes.toBytes("row1");
private static final byte[] row2 = Bytes.toBytes("row2");
private static final byte[] row3 = Bytes.toBytes("row3");
private static final byte[] noRepfamName = Bytes.toBytes("norep");
private static HTableDescriptor table;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
conf1.setInt("replication.source.size.capacity", 1024);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf3 = new Configuration(conf1);
conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZooKeeperWatcher(conf2, "cluster3", null, true);
utility3 = new HBaseTestingUtility(conf3);
utility3.setZkCluster(miniZK);
new ZooKeeperWatcher(conf3, "cluster3", null, true);
clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2";
clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf3.get("hbase.zookeeper.property.clientPort")+":/3";
table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
}
@Test(timeout=300000)
public void testMultiSlaveReplication() throws Exception {
LOG.info("testCyclicReplication");
MiniHBaseCluster master = utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
new HBaseAdmin(conf1).createTable(table);
new HBaseAdmin(conf2).createTable(table);
new HBaseAdmin(conf3).createTable(table);
HTable htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
HTable htable2 = new HTable(conf2, tableName);
htable2.setWriteBufferSize(1024);
HTable htable3 = new HTable(conf3, tableName);
htable3.setWriteBufferSize(1024);
admin1.addPeer("1", clusterKey2);
// put "row" and wait 'til it got around, then delete
putAndWait(row, famName, htable1, htable2);
deleteAndWait(row, htable1, htable2);
// check it wasn't replication to cluster 3
checkRow(row,0,htable3);
putAndWait(row2, famName, htable1, htable2);
// now roll the region server's logs
new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0).getServerName().toString());
// after the log was rolled put a new row
putAndWait(row3, famName, htable1, htable2);
admin1.addPeer("2", clusterKey3);
// put a row, check it was replicated to all clusters
putAndWait(row1, famName, htable1, htable2, htable3);
// delete and verify
deleteAndWait(row1, htable1, htable2, htable3);
// make sure row2 did not get replicated after
// cluster 3 was added
checkRow(row2,0,htable3);
// row3 will get replicated, because it was in the
// latest log
checkRow(row3,1,htable3);
Put p = new Put(row);
p.add(famName, row, row);
htable1.put(p);
// now roll the logs again
new HBaseAdmin(conf1).rollHLogWriter(master.getRegionServer(0)
.getServerName().toString());
// cleanup "row2", also conveniently use this to wait replication
// to finish
deleteAndWait(row2, htable1, htable2, htable3);
// Even if the log was rolled in the middle of the replication
// "row" is still replication.
checkRow(row, 1, htable2, htable3);
// cleanup the rest
deleteAndWait(row, htable1, htable2, htable3);
deleteAndWait(row3, htable1, htable2, htable3);
utility3.shutdownMiniCluster();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
private void checkRow(byte[] row, int count, HTable... tables) throws IOException {
Get get = new Get(row);
for (HTable table : tables) {
Result res = table.get(get);
assertEquals(count, res.size());
}
}
private void deleteAndWait(byte[] row, HTable source, HTable... targets)
throws Exception {
Delete del = new Delete(row);
source.delete(del);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
boolean removedFromAll = true;
for (HTable target : targets) {
Result res = target.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
removedFromAll = false;
break;
}
}
if (removedFromAll) {
break;
} else {
Thread.sleep(SLEEP_TIME);
}
}
}
private void putAndWait(byte[] row, byte[] fam, HTable source, HTable... targets)
throws Exception {
Put put = new Put(row);
put.add(fam, row, row);
source.put(put);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
boolean replicatedToAll = true;
for (HTable target : targets) {
Result res = target.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
replicatedToAll = false;
break;
} else {
assertArrayEquals(res.value(), row);
}
}
if (replicatedToAll) {
break;
} else {
Thread.sleep(SLEEP_TIME);
}
}
}
}

View File

@ -81,6 +81,8 @@ public class TestReplicationSourceManager {
private static final byte[] test = Bytes.toBytes("test");
private static final String slaveId = "1";
private static FileSystem fs;
private static Path oldLogDir;
@ -115,7 +117,7 @@ public class TestReplicationSourceManager {
logDir = new Path(utility.getTestDir(),
HConstants.HREGION_LOGDIR_NAME);
manager.addSource("1");
manager.addSource(slaveId);
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor("f1");
@ -188,7 +190,7 @@ public class TestReplicationSourceManager {
hlog.append(hri, key, edit, htd);
}
assertEquals(6, manager.getHLogs().size());
assertEquals(6, manager.getHLogs().get(slaveId).size());
hlog.rollWriter();