HBASE-2195 Support cyclic replication (Lars Hofhansl)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1166923 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-09-08 22:03:10 +00:00
parent 18e9b21862
commit 221f017b43
19 changed files with 616 additions and 144 deletions

View File

@ -486,6 +486,7 @@ Release 0.91.0 - Unreleased
HBASE-4289 Move spinlock to SingleSizeCache rather than the slab allocator
(Li Pi)
HBASE-4296 Deprecate HTable[Interface].getRowOrBefore(...) (Lars Hofhansl)
HBASE-2195 Support cyclic replication (Lars Hofhansl)
NEW FEATURES
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;
/**
@ -203,6 +204,12 @@ public final class HConstants {
/** Configuration key storing the cluster ID */
public static final String CLUSTER_ID = "hbase.cluster.id";
/**
* Attribute used in Puts and Gets to indicate the originating
* cluster.
*/
public static final String CLUSTER_ID_ATTR = "_c.id_";
// Always store the location of the root table's HRegion.
// This HRegion is never split.
@ -364,7 +371,7 @@ public final class HConstants {
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
*/
public static final byte DEFAULT_CLUSTER_ID = 0;
public static final UUID DEFAULT_CLUSTER_ID = new UUID(0L,0L);
/**
* Parameter name for maximum number of bytes returned when calling a

View File

@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
/**
* Used to perform Delete operations on a single row.
@ -513,4 +514,26 @@ public class Delete extends Operation
public void setWriteToWAL(boolean write) {
this.writeToWAL = write;
}
/**
* Set the replication custer id.
* @param clusterId
*/
public void setClusterId(UUID clusterId) {
byte[] val = new byte[2*Bytes.SIZEOF_LONG];
Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
setAttribute(HConstants.CLUSTER_ID_ATTR, val);
}
/**
* @return The replication cluster id.
*/
public UUID getClusterId() {
byte[] attr = getAttribute(HConstants.CLUSTER_ID_ATTR);
if (attr == null) {
return HConstants.DEFAULT_CLUSTER_ID;
}
return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
}
}

View File

@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
/**
@ -656,4 +657,26 @@ public class Put extends Operation
byte [][] parts = KeyValue.parseColumn(column);
return add(parts[0], parts[1], ts, value);
}
/**
* Set the replication custer id.
* @param clusterId
*/
public void setClusterId(UUID clusterId) {
byte[] val = new byte[2*Bytes.SIZEOF_LONG];
Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
setAttribute(HConstants.CLUSTER_ID_ATTR, val);
}
/**
* @return The replication cluster id.
*/
public UUID getClusterId() {
byte[] attr = getAttribute(HConstants.CLUSTER_ID_ATTR);
if (attr == null) {
return HConstants.DEFAULT_CLUSTER_ID;
}
return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
}
}

View File

@ -67,7 +67,6 @@ import org.apache.zookeeper.KeeperException;
public class ReplicationAdmin implements Closeable {
private final ReplicationZookeeper replicationZk;
private final Configuration configuration;
private final HConnection connection;
/**
@ -81,7 +80,6 @@ public class ReplicationAdmin implements Closeable {
throw new RuntimeException("hbase.replication isn't true, please " +
"enable it in order to use replication");
}
this.configuration = conf;
this.connection = HConnectionManager.getConnection(conf);
ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
try {

View File

@ -39,6 +39,7 @@ import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
@ -1396,7 +1397,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// All edits for the given row (across all column families) must happen atomically.
prepareDelete(delete);
delete(delete.getFamilyMap(), writeToWAL);
delete(delete.getFamilyMap(), delete.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -1405,14 +1406,13 @@ public class HRegion implements HeapSize { // , Writable{
}
}
/**
* @param familyMap map of family to edits for the given family.
* @param writeToWAL
* @throws IOException
*/
public void delete(Map<byte[], List<KeyValue>> familyMap, boolean writeToWAL)
throws IOException {
public void delete(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
boolean writeToWAL) throws IOException {
/* Run coprocessor pre hook outside of locks to avoid deadlock */
if (coprocessorHost != null) {
if (coprocessorHost.preDelete(familyMap, writeToWAL)) {
@ -1482,7 +1482,7 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, now, this.htableDescriptor);
walEdit, clusterId, now, this.htableDescriptor);
}
// Now make changes to the memstore.
@ -1557,7 +1557,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// All edits for the given row (across all column families) must happen atomically.
// Coprocessor interception happens in put(Map,boolean)
put(put.getFamilyMap(), writeToWAL);
put(put.getFamilyMap(), put.getClusterId(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
}
@ -1752,8 +1752,9 @@ public class HRegion implements HeapSize { // , Writable{
}
// Append the edit to WAL
Put first = batchOp.operations[firstIndex].getFirst();
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, now, this.htableDescriptor);
walEdit, first.getClusterId(), now, this.htableDescriptor);
// ------------------------------------
// STEP 4. Write back to memstore
@ -1883,13 +1884,18 @@ public class HRegion implements HeapSize { // , Writable{
}
//If matches put the new put or delete the new delete
if (matches) {
// All edits for the given row (across all column families) must happen atomically.
// All edits for the given row (across all column families) must
// happen atomically.
//
// Using default cluster id, as this can only happen in the
// originating cluster. A slave cluster receives the result as a Put
// or Delete
if (isPut) {
put(((Put)w).getFamilyMap(), writeToWAL);
put(((Put)w).getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
} else {
Delete d = (Delete)w;
prepareDelete(d);
delete(d.getFamilyMap(), writeToWAL);
delete(d.getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL);
}
return true;
}
@ -1980,7 +1986,7 @@ public class HRegion implements HeapSize { // , Writable{
familyMap = new HashMap<byte[], List<KeyValue>>();
familyMap.put(family, edits);
this.put(familyMap, true);
this.put(familyMap, HConstants.DEFAULT_CLUSTER_ID, true);
}
/**
@ -1990,8 +1996,8 @@ public class HRegion implements HeapSize { // , Writable{
* @param writeToWAL if true, then we should write to the log
* @throws IOException
*/
private void put(Map<byte [], List<KeyValue>> familyMap, boolean writeToWAL)
throws IOException {
private void put(Map<byte[], List<KeyValue>> familyMap, UUID clusterId,
boolean writeToWAL) throws IOException {
/* run pre put hook outside of lock to avoid deadlock */
if (coprocessorHost != null) {
if (coprocessorHost.prePut(familyMap, writeToWAL)) {
@ -2016,7 +2022,7 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit walEdit = new WALEdit();
addFamilyMapToWALEdit(familyMap, walEdit);
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, now, this.htableDescriptor);
walEdit, clusterId, now, this.htableDescriptor);
}
long addedSize = applyFamilyMapToMemstore(familyMap);
@ -3546,8 +3552,12 @@ public class HRegion implements HeapSize { // , Writable{
// Actually write to WAL now
if (writeToWAL) {
// Using default cluster id, as this can only happen in the orginating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdits, now, this.htableDescriptor);
walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
this.htableDescriptor);
}
size = this.addAndGetGlobalMemstoreSize(size);
@ -3623,8 +3633,12 @@ public class HRegion implements HeapSize { // , Writable{
long now = EnvironmentEdgeManager.currentTimeMillis();
WALEdit walEdit = new WALEdit();
walEdit.add(newKv);
// Using default cluster id, as this can only happen in the
// orginating cluster. A slave cluster receives the final value (not
// the delta) as a Put.
this.log.append(regionInfo, this.htableDescriptor.getName(),
walEdit, now, this.htableDescriptor);
walEdit, HConstants.DEFAULT_CLUSTER_ID, now,
this.htableDescriptor);
}
// Now request the ICV to the store, this will set the timestamp

View File

@ -37,6 +37,7 @@ import java.util.NavigableSet;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
@ -900,14 +901,15 @@ public class HLog implements Syncable {
* @param now
* @param regionName
* @param tableName
* @param clusterId
* @return New log key.
*/
protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, long now) {
return new HLogKey(regionName, tableName, seqnum, now);
protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum,
long now, UUID clusterId) {
return new HLogKey(regionName, tableName, seqnum, now, clusterId);
}
/** Append an entry to the log.
*
* @param regionInfo
@ -944,6 +946,22 @@ public class HLog implements Syncable {
}
}
/**
* Only used in tests.
*
* @param info
* @param tableName
* @param edits
* @param now
* @param htd
* @throws IOException
*/
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
final long now, HTableDescriptor htd)
throws IOException {
append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd);
}
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id.
@ -964,10 +982,11 @@ public class HLog implements Syncable {
* @param info
* @param tableName
* @param edits
* @param clusterId The originating clusterId for this edit (for replication)
* @param now
* @throws IOException
*/
public void append(HRegionInfo info, byte [] tableName, WALEdit edits,
public void append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd)
throws IOException {
if (edits.isEmpty()) return;
@ -985,7 +1004,7 @@ public class HLog implements Syncable {
// actual name.
byte [] hriKey = info.getEncodedNameAsBytes();
this.lastSeqWritten.putIfAbsent(hriKey, seqNum);
HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
HLogKey logKey = makeKey(hriKey, tableName, seqNum, now, clusterId);
doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet();
}
@ -1295,7 +1314,7 @@ public class HLog implements Syncable {
long now = System.currentTimeMillis();
WALEdit edit = completeCacheFlushLogEdit();
HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
System.currentTimeMillis());
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
this.writer.append(new Entry(key, edit));
writeTime += System.currentTimeMillis() - now;
writeOps++;

View File

@ -25,10 +25,12 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
/**
* A Key for an entry in the change log.
@ -41,6 +43,9 @@ import org.apache.hadoop.io.WritableComparable;
* associated row.
*/
public class HLogKey implements WritableComparable<HLogKey> {
// should be < 0 (@see #readFields(DataInput))
private static final int VERSION = -1;
// The encoded region name.
private byte [] encodedRegionName;
private byte [] tablename;
@ -48,11 +53,12 @@ public class HLogKey implements WritableComparable<HLogKey> {
// Time at which this edit was written.
private long writeTime;
private byte clusterId;
private UUID clusterId;
/** Writable Consructor -- Do not use. */
public HLogKey() {
this(null, null, 0L, HConstants.LATEST_TIMESTAMP);
this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
HConstants.DEFAULT_CLUSTER_ID);
}
/**
@ -65,14 +71,15 @@ public class HLogKey implements WritableComparable<HLogKey> {
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param UUID of the cluster (used in Replication)
*/
public HLogKey(final byte [] encodedRegionName, final byte [] tablename,
long logSeqNum, final long now) {
long logSeqNum, final long now, UUID clusterId) {
this.encodedRegionName = encodedRegionName;
this.tablename = tablename;
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
this.clusterId = clusterId;
}
/** @return encoded region name */
@ -105,7 +112,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
* Get the id of the original cluster
* @return Cluster id.
*/
public byte getClusterId() {
public UUID getClusterId() {
return clusterId;
}
@ -113,7 +120,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
* Set the cluster id of this key
* @param clusterId
*/
public void setClusterId(byte clusterId) {
public void setClusterId(UUID clusterId) {
this.clusterId = clusterId;
}
@ -154,7 +161,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
int result = Bytes.hashCode(this.encodedRegionName);
result ^= this.logSeqNum;
result ^= this.writeTime;
result ^= this.clusterId;
result ^= this.clusterId.hashCode();
return result;
}
@ -174,6 +181,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
}
}
// why isn't cluster id accounted for?
return result;
}
@ -203,23 +211,57 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.encodedRegionName = encodedRegionName;
}
@Override
public void write(DataOutput out) throws IOException {
WritableUtils.writeVInt(out, VERSION);
Bytes.writeByteArray(out, this.encodedRegionName);
Bytes.writeByteArray(out, this.tablename);
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
out.writeByte(this.clusterId);
// avoid storing 16 bytes when replication is not enabled
if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeLong(this.clusterId.getMostSignificantBits());
out.writeLong(this.clusterId.getLeastSignificantBits());
}
}
@Override
public void readFields(DataInput in) throws IOException {
this.encodedRegionName = Bytes.readByteArray(in);
int version = 0;
// HLogKey was not versioned in the beginning.
// In order to introduce it now, we make use of the fact
// that encodedRegionName was written with Bytes.writeByteArray,
// which encodes the array length as a vint which is >= 0.
// Hence if the vint is >= 0 we have an old version and the vint
// encodes the length of encodedRegionName.
// If < 0 we just read the version and the next vint is the length.
// @see Bytes#readByteArray(DataInput)
int len = WritableUtils.readVInt(in);
if (len < 0) {
// what we just read was the version
version = len;
len = WritableUtils.readVInt(in);
}
this.encodedRegionName = new byte[len];
in.readFully(this.encodedRegionName);
this.tablename = Bytes.readByteArray(in);
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
if (version < 0) {
if (in.readBoolean()) {
this.clusterId = new UUID(in.readLong(), in.readLong());
}
} else {
try {
this.clusterId = in.readByte();
// dummy read (former byte cluster id)
in.readByte();
} catch(EOFException e) {
// Means it's an old key, just continue
// Means it's a very old key, just continue
}
}
}
}

View File

@ -93,8 +93,6 @@ public class ReplicationZookeeper {
private final Configuration conf;
// Is this cluster replicating at the moment?
private AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
private String clusterId;
// The key to our own cluster
private String ourClusterKey;
// Abortable
@ -146,12 +144,8 @@ public class ReplicationZookeeper {
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.replication.peers", "peers");
String repMasterZNodeName =
conf.get("zookeeper.znode.replication.master", "master");
this.replicationStateNodeName =
conf.get("zookeeper.znode.replication.state", "state");
String clusterIdZNodeName =
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
String rsZNodeName =
conf.get("zookeeper.znode.replication.rs", "rs");
this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
@ -162,11 +156,6 @@ public class ReplicationZookeeper {
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String idResult = Bytes.toString(data);
this.clusterId = idResult == null?
Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
// Set a tracker on replicationStateNodeNode
this.statusTracker =
new ReplicationStatusTracker(this.zookeeper, abortable);
@ -701,15 +690,6 @@ public class ReplicationZookeeper {
this.zookeeper.registerListener(listener);
}
/**
* Get the identification of the cluster
*
* @return the id for the cluster
*/
public String getClusterId() {
return this.clusterId;
}
/**
* Get a map of all peer clusters
* @return map of peer cluster keyed by id

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete;
@ -106,6 +107,7 @@ public class ReplicationSink {
if (kvs.get(0).isDelete()) {
Delete delete = new Delete(kvs.get(0).getRow(),
kvs.get(0).getTimestamp(), null);
delete.setClusterId(entry.getKey().getClusterId());
for (KeyValue kv : kvs) {
if (kv.isDeleteFamily()) {
delete.deleteFamily(kv.getFamily());
@ -126,10 +128,12 @@ public class ReplicationSink {
byte[] lastKey = kvs.get(0).getRow();
Put put = new Put(kvs.get(0).getRow(),
kvs.get(0).getTimestamp());
put.setClusterId(entry.getKey().getClusterId());
for (KeyValue kv : kvs) {
if (!Bytes.equals(lastKey, kv.getRow())) {
tableList.add(put);
put = new Put(kv.getRow(), kv.getTimestamp());
put.setClusterId(entry.getKey().getClusterId());
}
put.add(kv.getFamily(), kv.getQualifier(), kv.getValue());
lastKey = kv.getRow();

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@ -88,7 +90,7 @@ public class ReplicationSource extends Thread
// should we replicate or not?
private AtomicBoolean replicating;
// id of the peer cluster this source replicates to
private String peerClusterId;
private String peerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@ -109,7 +111,9 @@ public class ReplicationSource extends Thread
private volatile Path currentPath;
private FileSystem fs;
// id of this cluster
private byte clusterId;
private UUID clusterId;
// id of the other cluster
private UUID peerClusterId;
// total number of edits we replicated
private long totalReplicatedEdits = 0;
// The znode we currently play with
@ -176,9 +180,15 @@ public class ReplicationSource extends Thread
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.fs = fs;
this.clusterId = Byte.valueOf(zkHelper.getClusterId());
this.metrics = new ReplicationSourceMetrics(peerClusterZnode);
try {
this.clusterId = UUID.fromString(ClusterId.readClusterIdZNode(zkHelper
.getZookeeperWatcher()));
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
// Finally look if this is a recovered queue
this.checkIfQueueRecovered(peerClusterZnode);
}
@ -188,7 +198,7 @@ public class ReplicationSource extends Thread
private void checkIfQueueRecovered(String peerClusterZnode) {
String[] parts = peerClusterZnode.split("-");
this.queueRecovered = parts.length != 1;
this.peerClusterId = this.queueRecovered ?
this.peerId = this.queueRecovered ?
parts[0] : peerClusterZnode;
this.peerClusterZnode = peerClusterZnode;
this.deadRegionServers = new String[parts.length-1];
@ -204,11 +214,11 @@ public class ReplicationSource extends Thread
private void chooseSinks() throws KeeperException {
this.currentPeers.clear();
List<ServerName> addresses =
this.zkHelper.getSlavesAddresses(peerClusterId);
this.zkHelper.getSlavesAddresses(peerId);
Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
" rs from peer cluster # " + peerClusterId);
" rs from peer cluster # " + peerId);
for (int i = 0; i < nbPeers; i++) {
HServerAddress address;
// Make sure we get one address that we don't already have
@ -235,6 +245,15 @@ public class ReplicationSource extends Thread
if (this.stopper.isStopped()) {
return;
}
// delay this until we are in an asynchronous thread
try {
this.peerClusterId = UUID.fromString(ClusterId
.readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw()));
} catch (KeeperException ke) {
this.terminate("Could not read peer's cluster id", ke);
}
LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
if (this.queueRecovered) {
@ -350,7 +369,7 @@ public class ReplicationSource extends Thread
LOG.debug("Attempt to close connection failed", e);
}
}
LOG.debug("Source exiting " + peerClusterId);
LOG.debug("Source exiting " + peerId);
}
/**
@ -371,19 +390,28 @@ public class ReplicationSource extends Thread
this.metrics.logEditsReadRate.inc(1);
seenEntries++;
// Remove all KVs that should not be replicated
removeNonReplicableEdits(edit);
HLogKey logKey = entry.getKey();
// don't replicate if the log entries originated in the peer
if (!logKey.getClusterId().equals(peerClusterId)) {
removeNonReplicableEdits(edit);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
edit.size() != 0 && replicating.get()) {
// Only set the clusterId if is a local key.
// This ensures that the originator sets the cluster id
// and all replicas retain the initial cluster id.
// This is *only* place where a cluster id other than the default is set.
if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
logKey.setClusterId(this.clusterId);
}
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
} else {
this.metrics.logEditsFilteredRate.inc(1);
}
}
// Stop if too many entries or too big
if ((this.reader.getPosition() - this.position)
>= this.replicationQueueSizeCapacity ||
@ -523,13 +551,12 @@ public class ReplicationSource extends Thread
protected void removeNonReplicableEdits(WALEdit edit) {
NavigableMap<byte[], Integer> scopes = edit.getScopes();
List<KeyValue> kvs = edit.getKeyValues();
for (int i = 0; i < edit.size(); i++) {
for (int i = edit.size()-1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
if (scopes == null || !scopes.containsKey(kv.getFamily())) {
kvs.remove(i);
i--;
}
}
}
@ -706,7 +733,7 @@ public class ReplicationSource extends Thread
}
public String getPeerClusterId() {
return this.peerClusterId;
return this.peerId;
}
public Path getCurrentPath() {

View File

@ -765,7 +765,7 @@ public class TestHRegion extends HBaseTestCase {
try {
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
deleteMap.put(family, kvs);
region.delete(deleteMap, true);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true);
} catch (Exception e) {
assertTrue("Family " +new String(family)+ " does not exist", false);
}
@ -776,7 +776,7 @@ public class TestHRegion extends HBaseTestCase {
try {
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
deleteMap.put(family, kvs);
region.delete(deleteMap, true);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true);
} catch (Exception e) {
ok = true;
}
@ -1042,7 +1042,7 @@ public class TestHRegion extends HBaseTestCase {
Map<byte[], List<KeyValue>> deleteMap = new HashMap<byte[], List<KeyValue>>();
deleteMap.put(fam1, kvs);
region.delete(deleteMap, true);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true);
// extract the key values out the memstore:
// This is kinda hacky, but better than nothing...

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
@ -159,7 +160,8 @@ public class TestHLogMethods {
WALEdit edit = new WALEdit();
edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val"));
HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now);
HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now,
HConstants.DEFAULT_CLUSTER_ID);
HLog.Entry entry = new HLog.Entry(key, edit);
return entry;
}

View File

@ -162,7 +162,8 @@ public class TestHLogSplit {
fs.mkdirs(regiondir);
long now = System.currentTimeMillis();
HLog.Entry entry =
new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now),
new HLog.Entry(new HLogKey(encoded,
HConstants.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true);
String parentOfParent = p.getParent().getParent().getName();
@ -1179,7 +1180,8 @@ public class TestHLogSplit {
WALEdit edit = new WALEdit();
seq++;
edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value));
return new HLog.Entry(new HLogKey(region, table, seq, time), edit);
return new HLog.Entry(new HLogKey(region, table, seq, time,
HConstants.DEFAULT_CLUSTER_ID), edit);
}

View File

@ -95,7 +95,7 @@ public class TestWALActionsListener {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(b));
HLogKey key = new HLogKey(b,b, 0, 0);
HLogKey key = new HLogKey(b,b, 0, 0, HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
if (i == 10) {
hlog.registerWALActionsListener(laterobserver);

View File

@ -0,0 +1,329 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestMasterReplication {
private static final Log LOG = LogFactory.getLog(TestReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static Configuration conf3;
private static String clusterKey1;
private static String clusterKey2;
private static String clusterKey3;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static HBaseTestingUtility utility3;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 10;
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] row1 = Bytes.toBytes("row1");
private static final byte[] row2 = Bytes.toBytes("row2");
private static final byte[] noRepfamName = Bytes.toBytes("norep");
private static final byte[] count = Bytes.toBytes("count");
private static final byte[] put = Bytes.toBytes("put");
private static final byte[] delete = Bytes.toBytes("delete");
private static HTableDescriptor table;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
conf1.setInt("replication.source.size.capacity", 1024);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf3 = new Configuration(conf1);
conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZooKeeperWatcher(conf2, "cluster3", null, true);
utility3 = new HBaseTestingUtility(conf3);
utility3.setZkCluster(miniZK);
new ZooKeeperWatcher(conf3, "cluster3", null, true);
clusterKey1 = conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf1.get("hbase.zookeeper.property.clientPort")+":/1";
clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf2.get("hbase.zookeeper.property.clientPort")+":/2";
clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf3.get("hbase.zookeeper.property.clientPort")+":/3";
table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
}
@Test(timeout=300000)
public void testCyclicReplication() throws Exception {
LOG.info("testCyclicReplication");
utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
ReplicationAdmin admin3 = new ReplicationAdmin(conf3);
new HBaseAdmin(conf1).createTable(table);
new HBaseAdmin(conf2).createTable(table);
new HBaseAdmin(conf3).createTable(table);
HTable htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
HTable htable2 = new HTable(conf2, tableName);
htable2.setWriteBufferSize(1024);
HTable htable3 = new HTable(conf3, tableName);
htable3.setWriteBufferSize(1024);
admin1.addPeer("1", clusterKey2);
admin2.addPeer("1", clusterKey3);
admin3.addPeer("1", clusterKey1);
// put "row" and wait 'til it got around
putAndWait(row, famName, htable1, htable3);
// it should have passed through table2
check(row,famName,htable2);
putAndWait(row1, famName, htable2, htable1);
check(row,famName,htable3);
putAndWait(row2, famName, htable3, htable2);
check(row,famName,htable1);
deleteAndWait(row,htable1,htable3);
deleteAndWait(row1,htable2,htable1);
deleteAndWait(row2,htable3,htable2);
assertEquals("Puts were replicated back ", 3, getCount(htable1, put));
assertEquals("Puts were replicated back ", 3, getCount(htable2, put));
assertEquals("Puts were replicated back ", 3, getCount(htable3, put));
assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
utility3.shutdownMiniCluster();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
/**
* Add a row to a table in each cluster, check it's replicated,
* delete it, check's gone
* Also check the puts and deletes are not replicated back to
* the originating cluster.
*/
@Test(timeout=300000)
public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete");
utility1.startMiniCluster();
utility2.startMiniCluster();
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
new HBaseAdmin(conf1).createTable(table);
new HBaseAdmin(conf2).createTable(table);
HTable htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
HTable htable2 = new HTable(conf2, tableName);
htable2.setWriteBufferSize(1024);
// set M-M
admin1.addPeer("1", clusterKey2);
admin2.addPeer("1", clusterKey1);
// add rows to both clusters,
// make sure they are both replication
putAndWait(row, famName, htable1, htable2);
putAndWait(row1, famName, htable2, htable1);
// make sure "row" did not get replicated back.
assertEquals("Puts were replicated back ", 2, getCount(htable1, put));
// delete "row" and wait
deleteAndWait(row, htable1, htable2);
// make the 2nd cluster replicated back
assertEquals("Puts were replicated back ", 2, getCount(htable2, put));
deleteAndWait(row1, htable2, htable1);
assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete));
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
private int getCount(HTable t, byte[] type) throws IOException {
Get test = new Get(row);
test.setAttribute("count", new byte[]{});
Result res = t.get(test);
return Bytes.toInt(res.getValue(count, type));
}
private void deleteAndWait(byte[] row, HTable source, HTable target)
throws Exception {
Delete del = new Delete(row);
source.delete(del);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
Result res = target.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
}
private void check(byte[] row, byte[] fam, HTable t) throws IOException {
Get get = new Get(row);
Result res = t.get(get);
if (res.size() == 0) {
fail("Row is missing");
}
}
private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target)
throws Exception {
Put put = new Put(row);
put.add(fam, row, row);
source.put(put);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
Result res = target.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
break;
}
}
}
/**
* Use a coprocessor to count puts and deletes.
* as KVs would be replicated back with the same timestamp
* there is otherwise no way to count them.
*/
public static class CoprocessorCounter extends BaseRegionObserver {
private int nCount = 0;
private int nDelete = 0;
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException {
nCount++;
}
@Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException {
nDelete++;
}
@Override
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get, final List<KeyValue> result) throws IOException {
if (get.getAttribute("count") != null) {
result.clear();
// order is important!
result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
c.bypass();
}
}
}
}

View File

@ -78,7 +78,7 @@ public class TestReplicationSource {
KeyValue kv = new KeyValue(b,b,b);
WALEdit edit = new WALEdit();
edit.add(kv);
HLogKey key = new HLogKey(b, b, 0, 0);
HLogKey key = new HLogKey(b, b, 0, 0, HConstants.DEFAULT_CLUSTER_ID);
writer.append(new HLog.Entry(key, edit));
writer.sync();
}

View File

@ -244,7 +244,8 @@ public class TestReplicationSink {
now, KeyValue.Type.DeleteFamily);
}
HLogKey key = new HLogKey(table, table, now, now);
HLogKey key = new HLogKey(table, table, now, now,
HConstants.DEFAULT_CLUSTER_ID);
WALEdit edit = new WALEdit();
edit.add(kv);

View File

@ -170,8 +170,8 @@ public class TestReplicationSourceManager {
hlog.rollWriter();
}
LOG.info(i);
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
}
@ -183,8 +183,8 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
}
@ -195,8 +195,8 @@ public class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false);
HLogKey key = new HLogKey(hri.getRegionName(),
test, seq++, System.currentTimeMillis());
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, key, edit, htd);
assertEquals(1, manager.getHLogs().size());