diff --git a/CHANGES.txt b/CHANGES.txt index 9e498549ede..27e1f3a4c30 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -486,6 +486,7 @@ Release 0.91.0 - Unreleased HBASE-4289 Move spinlock to SingleSizeCache rather than the slab allocator (Li Pi) HBASE-4296 Deprecate HTable[Interface].getRowOrBefore(...) (Lars Hofhansl) + HBASE-2195 Support cyclic replication (Lars Hofhansl) NEW FEATURES HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via diff --git a/src/main/java/org/apache/hadoop/hbase/HConstants.java b/src/main/java/org/apache/hadoop/hbase/HConstants.java index 3e3e902390b..45fefe4c71d 100644 --- a/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.util.Bytes; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.regex.Pattern; /** @@ -203,6 +204,12 @@ public final class HConstants { /** Configuration key storing the cluster ID */ public static final String CLUSTER_ID = "hbase.cluster.id"; + /** + * Attribute used in Puts and Gets to indicate the originating + * cluster. + */ + public static final String CLUSTER_ID_ATTR = "_c.id_"; + // Always store the location of the root table's HRegion. // This HRegion is never split. @@ -364,7 +371,7 @@ public final class HConstants { * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. */ - public static final byte DEFAULT_CLUSTER_ID = 0; + public static final UUID DEFAULT_CLUSTER_ID = new UUID(0L,0L); /** * Parameter name for maximum number of bytes returned when calling a diff --git a/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/src/main/java/org/apache/hadoop/hbase/client/Delete.java index 01b77c1460d..cb20b469caa 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Delete.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Delete.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; /** * Used to perform Delete operations on a single row. @@ -513,4 +514,26 @@ public class Delete extends Operation public void setWriteToWAL(boolean write) { this.writeToWAL = write; } + + /** + * Set the replication custer id. + * @param clusterId + */ + public void setClusterId(UUID clusterId) { + byte[] val = new byte[2*Bytes.SIZEOF_LONG]; + Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); + Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); + setAttribute(HConstants.CLUSTER_ID_ATTR, val); + } + + /** + * @return The replication cluster id. + */ + public UUID getClusterId() { + byte[] attr = getAttribute(HConstants.CLUSTER_ID_ATTR); + if (attr == null) { + return HConstants.DEFAULT_CLUSTER_ID; + } + return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG)); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/Put.java b/src/main/java/org/apache/hadoop/hbase/client/Put.java index fa25c630cdd..c39626ef67d 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -40,6 +40,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; /** @@ -656,4 +657,26 @@ public class Put extends Operation byte [][] parts = KeyValue.parseColumn(column); return add(parts[0], parts[1], ts, value); } + + /** + * Set the replication custer id. + * @param clusterId + */ + public void setClusterId(UUID clusterId) { + byte[] val = new byte[2*Bytes.SIZEOF_LONG]; + Bytes.putLong(val, 0, clusterId.getMostSignificantBits()); + Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits()); + setAttribute(HConstants.CLUSTER_ID_ATTR, val); + } + + /** + * @return The replication cluster id. + */ + public UUID getClusterId() { + byte[] attr = getAttribute(HConstants.CLUSTER_ID_ATTR); + if (attr == null) { + return HConstants.DEFAULT_CLUSTER_ID; + } + return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG)); + } } diff --git a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 280d2d721b6..81171abcf56 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -67,7 +67,6 @@ import org.apache.zookeeper.KeeperException; public class ReplicationAdmin implements Closeable { private final ReplicationZookeeper replicationZk; - private final Configuration configuration; private final HConnection connection; /** @@ -81,7 +80,6 @@ public class ReplicationAdmin implements Closeable { throw new RuntimeException("hbase.replication isn't true, please " + "enable it in order to use replication"); } - this.configuration = conf; this.connection = HConnectionManager.getConnection(conf); ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher(); try { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a6c7147dd7c..876f9f0394b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -39,6 +39,7 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Random; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; @@ -1396,7 +1397,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - delete(delete.getFamilyMap(), writeToWAL); + delete(delete.getFamilyMap(), delete.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1405,14 +1406,13 @@ public class HRegion implements HeapSize { // , Writable{ } } - /** * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(Map> familyMap, boolean writeToWAL) - throws IOException { + public void delete(Map> familyMap, UUID clusterId, + boolean writeToWAL) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ if (coprocessorHost != null) { if (coprocessorHost.preDelete(familyMap, writeToWAL)) { @@ -1482,7 +1482,7 @@ public class HRegion implements HeapSize { // , Writable{ WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } // Now make changes to the memstore. @@ -1557,7 +1557,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // All edits for the given row (across all column families) must happen atomically. // Coprocessor interception happens in put(Map,boolean) - put(put.getFamilyMap(), writeToWAL); + put(put.getFamilyMap(), put.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1752,8 +1752,9 @@ public class HRegion implements HeapSize { // , Writable{ } // Append the edit to WAL + Put first = batchOp.operations[firstIndex].getFirst(); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, first.getClusterId(), now, this.htableDescriptor); // ------------------------------------ // STEP 4. Write back to memstore @@ -1883,13 +1884,18 @@ public class HRegion implements HeapSize { // , Writable{ } //If matches put the new put or delete the new delete if (matches) { - // All edits for the given row (across all column families) must happen atomically. + // All edits for the given row (across all column families) must + // happen atomically. + // + // Using default cluster id, as this can only happen in the + // originating cluster. A slave cluster receives the result as a Put + // or Delete if (isPut) { - put(((Put)w).getFamilyMap(), writeToWAL); + put(((Put)w).getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } else { Delete d = (Delete)w; prepareDelete(d); - delete(d.getFamilyMap(), writeToWAL); + delete(d.getFamilyMap(), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); } return true; } @@ -1980,7 +1986,7 @@ public class HRegion implements HeapSize { // , Writable{ familyMap = new HashMap>(); familyMap.put(family, edits); - this.put(familyMap, true); + this.put(familyMap, HConstants.DEFAULT_CLUSTER_ID, true); } /** @@ -1990,8 +1996,8 @@ public class HRegion implements HeapSize { // , Writable{ * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(Map> familyMap, boolean writeToWAL) - throws IOException { + private void put(Map> familyMap, UUID clusterId, + boolean writeToWAL) throws IOException { /* run pre put hook outside of lock to avoid deadlock */ if (coprocessorHost != null) { if (coprocessorHost.prePut(familyMap, writeToWAL)) { @@ -2016,7 +2022,7 @@ public class HRegion implements HeapSize { // , Writable{ WALEdit walEdit = new WALEdit(); addFamilyMapToWALEdit(familyMap, walEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); + walEdit, clusterId, now, this.htableDescriptor); } long addedSize = applyFamilyMapToMemstore(familyMap); @@ -3546,8 +3552,12 @@ public class HRegion implements HeapSize { // , Writable{ // Actually write to WAL now if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdits, now, this.htableDescriptor); + walEdits, HConstants.DEFAULT_CLUSTER_ID, now, + this.htableDescriptor); } size = this.addAndGetGlobalMemstoreSize(size); @@ -3604,36 +3614,40 @@ public class HRegion implements HeapSize { // , Writable{ if (!results.isEmpty()) { KeyValue kv = results.get(0); if(kv.getValueLength() == 8){ - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); } else{ - wrongLength = true; + wrongLength = true; } } if(!wrongLength){ - // build the KeyValue now: - KeyValue newKv = new KeyValue(row, family, + // build the KeyValue now: + KeyValue newKv = new KeyValue(row, family, qualifier, EnvironmentEdgeManager.currentTimeMillis(), Bytes.toBytes(result)); - // now log it: - if (writeToWAL) { - long now = EnvironmentEdgeManager.currentTimeMillis(); - WALEdit walEdit = new WALEdit(); - walEdit.add(newKv); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, now, this.htableDescriptor); - } + // now log it: + if (writeToWAL) { + long now = EnvironmentEdgeManager.currentTimeMillis(); + WALEdit walEdit = new WALEdit(); + walEdit.add(newKv); + // Using default cluster id, as this can only happen in the + // orginating cluster. A slave cluster receives the final value (not + // the delta) as a Put. + this.log.append(regionInfo, this.htableDescriptor.getName(), + walEdit, HConstants.DEFAULT_CLUSTER_ID, now, + this.htableDescriptor); + } - // Now request the ICV to the store, this will set the timestamp - // appropriately depending on if there is a value in memcache or not. - // returns the change in the size of the memstore from operation - long size = store.updateColumnValue(row, family, qualifier, result); + // Now request the ICV to the store, this will set the timestamp + // appropriately depending on if there is a value in memcache or not. + // returns the change in the size of the memstore from operation + long size = store.updateColumnValue(row, family, qualifier, result); - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); } } finally { this.updatesLock.readLock().unlock(); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 8fb86ac26a6..72412beed5b 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -37,6 +37,7 @@ import java.util.NavigableSet; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -900,14 +901,15 @@ public class HLog implements Syncable { * @param now * @param regionName * @param tableName + * @param clusterId * @return New log key. */ - protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, long now) { - return new HLogKey(regionName, tableName, seqnum, now); + protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqnum, + long now, UUID clusterId) { + return new HLogKey(regionName, tableName, seqnum, now, clusterId); } - /** Append an entry to the log. * * @param regionInfo @@ -944,6 +946,22 @@ public class HLog implements Syncable { } } + /** + * Only used in tests. + * + * @param info + * @param tableName + * @param edits + * @param now + * @param htd + * @throws IOException + */ + public void append(HRegionInfo info, byte [] tableName, WALEdit edits, + final long now, HTableDescriptor htd) + throws IOException { + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd); + } + /** * Append a set of edits to the log. Log edits are keyed by (encoded) * regionName, rowname, and log-sequence-id. @@ -964,39 +982,40 @@ public class HLog implements Syncable { * @param info * @param tableName * @param edits + * @param clusterId The originating clusterId for this edit (for replication) * @param now * @throws IOException */ - public void append(HRegionInfo info, byte [] tableName, WALEdit edits, - final long now, HTableDescriptor htd) - throws IOException { - if (edits.isEmpty()) return; - if (this.closed) { - throw new IOException("Cannot append; log is closed"); + public void append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd) + throws IOException { + if (edits.isEmpty()) return; + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + synchronized (this.updateLock) { + long seqNum = obtainSeqNum(); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region (i.e. the first edit added to the particular + // memstore). . When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + // Use encoded name. Its shorter, guaranteed unique and a subset of + // actual name. + byte [] hriKey = info.getEncodedNameAsBytes(); + this.lastSeqWritten.putIfAbsent(hriKey, seqNum); + HLogKey logKey = makeKey(hriKey, tableName, seqNum, now, clusterId); + doWrite(info, logKey, edits, htd); + this.numEntries.incrementAndGet(); + } + // Sync if catalog region, and if not then check if that table supports + // deferred log flushing + if (info.isMetaRegion() || + !htd.isDeferredLogFlush()) { + // sync txn to file system + this.sync(); + } } - synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); - // The 'lastSeqWritten' map holds the sequence number of the oldest - // write for each region (i.e. the first edit added to the particular - // memstore). . When the cache is flushed, the entry for the - // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten. - // Use encoded name. Its shorter, guaranteed unique and a subset of - // actual name. - byte [] hriKey = info.getEncodedNameAsBytes(); - this.lastSeqWritten.putIfAbsent(hriKey, seqNum); - HLogKey logKey = makeKey(hriKey, tableName, seqNum, now); - doWrite(info, logKey, edits, htd); - this.numEntries.incrementAndGet(); - } - // Sync if catalog region, and if not then check if that table supports - // deferred log flushing - if (info.isMetaRegion() || - !htd.isDeferredLogFlush()) { - // sync txn to file system - this.sync(); - } - } /** * This thread is responsible to call syncFs and buffer up the writers while @@ -1295,7 +1314,7 @@ public class HLog implements Syncable { long now = System.currentTimeMillis(); WALEdit edit = completeCacheFlushLogEdit(); HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, - System.currentTimeMillis()); + System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); this.writer.append(new Entry(key, edit)); writeTime += System.currentTimeMillis() - now; writeOps++; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index 08bbff8911f..9dba3fb6b05 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -25,10 +25,12 @@ import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; /** * A Key for an entry in the change log. @@ -41,6 +43,9 @@ import org.apache.hadoop.io.WritableComparable; * associated row. */ public class HLogKey implements WritableComparable { + // should be < 0 (@see #readFields(DataInput)) + private static final int VERSION = -1; + // The encoded region name. private byte [] encodedRegionName; private byte [] tablename; @@ -48,11 +53,12 @@ public class HLogKey implements WritableComparable { // Time at which this edit was written. private long writeTime; - private byte clusterId; + private UUID clusterId; /** Writable Consructor -- Do not use. */ public HLogKey() { - this(null, null, 0L, HConstants.LATEST_TIMESTAMP); + this(null, null, 0L, HConstants.LATEST_TIMESTAMP, + HConstants.DEFAULT_CLUSTER_ID); } /** @@ -65,14 +71,15 @@ public class HLogKey implements WritableComparable { * @param tablename - name of table * @param logSeqNum - log sequence number * @param now Time at which this edit was written. + * @param UUID of the cluster (used in Replication) */ public HLogKey(final byte [] encodedRegionName, final byte [] tablename, - long logSeqNum, final long now) { + long logSeqNum, final long now, UUID clusterId) { this.encodedRegionName = encodedRegionName; this.tablename = tablename; this.logSeqNum = logSeqNum; this.writeTime = now; - this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + this.clusterId = clusterId; } /** @return encoded region name */ @@ -105,7 +112,7 @@ public class HLogKey implements WritableComparable { * Get the id of the original cluster * @return Cluster id. */ - public byte getClusterId() { + public UUID getClusterId() { return clusterId; } @@ -113,7 +120,7 @@ public class HLogKey implements WritableComparable { * Set the cluster id of this key * @param clusterId */ - public void setClusterId(byte clusterId) { + public void setClusterId(UUID clusterId) { this.clusterId = clusterId; } @@ -154,7 +161,7 @@ public class HLogKey implements WritableComparable { int result = Bytes.hashCode(this.encodedRegionName); result ^= this.logSeqNum; result ^= this.writeTime; - result ^= this.clusterId; + result ^= this.clusterId.hashCode(); return result; } @@ -174,6 +181,7 @@ public class HLogKey implements WritableComparable { } } } + // why isn't cluster id accounted for? return result; } @@ -203,23 +211,57 @@ public class HLogKey implements WritableComparable { this.encodedRegionName = encodedRegionName; } + @Override public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, VERSION); Bytes.writeByteArray(out, this.encodedRegionName); Bytes.writeByteArray(out, this.tablename); out.writeLong(this.logSeqNum); out.writeLong(this.writeTime); - out.writeByte(this.clusterId); + // avoid storing 16 bytes when replication is not enabled + if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeLong(this.clusterId.getMostSignificantBits()); + out.writeLong(this.clusterId.getLeastSignificantBits()); + } } + @Override public void readFields(DataInput in) throws IOException { - this.encodedRegionName = Bytes.readByteArray(in); + int version = 0; + // HLogKey was not versioned in the beginning. + // In order to introduce it now, we make use of the fact + // that encodedRegionName was written with Bytes.writeByteArray, + // which encodes the array length as a vint which is >= 0. + // Hence if the vint is >= 0 we have an old version and the vint + // encodes the length of encodedRegionName. + // If < 0 we just read the version and the next vint is the length. + // @see Bytes#readByteArray(DataInput) + int len = WritableUtils.readVInt(in); + if (len < 0) { + // what we just read was the version + version = len; + len = WritableUtils.readVInt(in); + } + this.encodedRegionName = new byte[len]; + in.readFully(this.encodedRegionName); this.tablename = Bytes.readByteArray(in); this.logSeqNum = in.readLong(); this.writeTime = in.readLong(); - try { - this.clusterId = in.readByte(); - } catch(EOFException e) { - // Means it's an old key, just continue + this.clusterId = HConstants.DEFAULT_CLUSTER_ID; + if (version < 0) { + if (in.readBoolean()) { + this.clusterId = new UUID(in.readLong(), in.readLong()); + } + } else { + try { + // dummy read (former byte cluster id) + in.readByte(); + } catch(EOFException e) { + // Means it's a very old key, just continue + } } } } diff --git a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java index b6843b91269..6ba24c0a85e 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java @@ -93,8 +93,6 @@ public class ReplicationZookeeper { private final Configuration conf; // Is this cluster replicating at the moment? private AtomicBoolean replicating; - // Byte (stored as string here) that identifies this cluster - private String clusterId; // The key to our own cluster private String ourClusterKey; // Abortable @@ -146,12 +144,8 @@ public class ReplicationZookeeper { conf.get("zookeeper.znode.replication", "replication"); String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); - String repMasterZNodeName = - conf.get("zookeeper.znode.replication.master", "master"); this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state"); - String clusterIdZNodeName = - conf.get("zookeeper.znode.replication.clusterId", "clusterId"); String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); @@ -162,11 +156,6 @@ public class ReplicationZookeeper { this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName); ZKUtil.createWithParents(this.zookeeper, this.rsZNode); - String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName); - byte [] data = ZKUtil.getData(this.zookeeper, znode); - String idResult = Bytes.toString(data); - this.clusterId = idResult == null? - Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult; // Set a tracker on replicationStateNodeNode this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable); @@ -701,15 +690,6 @@ public class ReplicationZookeeper { this.zookeeper.registerListener(listener); } - /** - * Get the identification of the cluster - * - * @return the id for the cluster - */ - public String getClusterId() { - return this.clusterId; - } - /** * Get a map of all peer clusters * @return map of peer cluster keyed by id @@ -762,4 +742,4 @@ public class ReplicationZookeeper { } } } -} \ No newline at end of file +} diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 53434030dc7..8b78eaf9ab8 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Delete; @@ -106,6 +107,7 @@ public class ReplicationSink { if (kvs.get(0).isDelete()) { Delete delete = new Delete(kvs.get(0).getRow(), kvs.get(0).getTimestamp(), null); + delete.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { if (kv.isDeleteFamily()) { delete.deleteFamily(kv.getFamily()); @@ -126,10 +128,12 @@ public class ReplicationSink { byte[] lastKey = kvs.get(0).getRow(); Put put = new Put(kvs.get(0).getRow(), kvs.get(0).getTimestamp()); + put.setClusterId(entry.getKey().getClusterId()); for (KeyValue kv : kvs) { if (!Bytes.equals(lastKey, kv.getRow())) { tableList.add(put); put = new Put(kv.getRow(), kv.getTimestamp()); + put.setClusterId(entry.getKey().getClusterId()); } put.add(kv.getFamily(), kv.getQualifier(), kv.getValue()); lastKey = kv.getRow(); diff --git a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2e8de8b2b86..1f3ad342a8e 100644 --- a/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.NavigableMap; import java.util.Random; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; @@ -88,7 +90,7 @@ public class ReplicationSource extends Thread // should we replicate or not? private AtomicBoolean replicating; // id of the peer cluster this source replicates to - private String peerClusterId; + private String peerId; // The manager of all sources to which we ping back our progress private ReplicationSourceManager manager; // Should we stop everything? @@ -109,7 +111,9 @@ public class ReplicationSource extends Thread private volatile Path currentPath; private FileSystem fs; // id of this cluster - private byte clusterId; + private UUID clusterId; + // id of the other cluster + private UUID peerClusterId; // total number of edits we replicated private long totalReplicatedEdits = 0; // The znode we currently play with @@ -176,9 +180,15 @@ public class ReplicationSource extends Thread this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.fs = fs; - this.clusterId = Byte.valueOf(zkHelper.getClusterId()); this.metrics = new ReplicationSourceMetrics(peerClusterZnode); + try { + this.clusterId = UUID.fromString(ClusterId.readClusterIdZNode(zkHelper + .getZookeeperWatcher())); + } catch (KeeperException ke) { + throw new IOException("Could not read cluster id", ke); + } + // Finally look if this is a recovered queue this.checkIfQueueRecovered(peerClusterZnode); } @@ -188,7 +198,7 @@ public class ReplicationSource extends Thread private void checkIfQueueRecovered(String peerClusterZnode) { String[] parts = peerClusterZnode.split("-"); this.queueRecovered = parts.length != 1; - this.peerClusterId = this.queueRecovered ? + this.peerId = this.queueRecovered ? parts[0] : peerClusterZnode; this.peerClusterZnode = peerClusterZnode; this.deadRegionServers = new String[parts.length-1]; @@ -204,11 +214,11 @@ public class ReplicationSource extends Thread private void chooseSinks() throws KeeperException { this.currentPeers.clear(); List addresses = - this.zkHelper.getSlavesAddresses(peerClusterId); + this.zkHelper.getSlavesAddresses(peerId); Set setOfAddr = new HashSet(); int nbPeers = (int) (Math.ceil(addresses.size() * ratio)); LOG.info("Getting " + nbPeers + - " rs from peer cluster # " + peerClusterId); + " rs from peer cluster # " + peerId); for (int i = 0; i < nbPeers; i++) { HServerAddress address; // Make sure we get one address that we don't already have @@ -235,6 +245,15 @@ public class ReplicationSource extends Thread if (this.stopper.isStopped()) { return; } + // delay this until we are in an asynchronous thread + try { + this.peerClusterId = UUID.fromString(ClusterId + .readClusterIdZNode(zkHelper.getPeerClusters().get(peerId).getZkw())); + } catch (KeeperException ke) { + this.terminate("Could not read peer's cluster id", ke); + } + LOG.info("Replicating "+clusterId + " -> " + peerClusterId); + // If this is recovered, the queue is already full and the first log // normally has a position (unless the RS failed between 2 logs) if (this.queueRecovered) { @@ -350,7 +369,7 @@ public class ReplicationSource extends Thread LOG.debug("Attempt to close connection failed", e); } } - LOG.debug("Source exiting " + peerClusterId); + LOG.debug("Source exiting " + peerId); } /** @@ -371,18 +390,27 @@ public class ReplicationSource extends Thread this.metrics.logEditsReadRate.inc(1); seenEntries++; // Remove all KVs that should not be replicated - removeNonReplicableEdits(edit); HLogKey logKey = entry.getKey(); - // Don't replicate catalog entries, if the WALEdit wasn't - // containing anything to replicate and if we're currently not set to replicate - if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || - Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && - edit.size() != 0 && replicating.get()) { - logKey.setClusterId(this.clusterId); - currentNbOperations += countDistinctRowKeys(edit); - currentNbEntries++; - } else { - this.metrics.logEditsFilteredRate.inc(1); + // don't replicate if the log entries originated in the peer + if (!logKey.getClusterId().equals(peerClusterId)) { + removeNonReplicableEdits(edit); + // Don't replicate catalog entries, if the WALEdit wasn't + // containing anything to replicate and if we're currently not set to replicate + if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || + Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) && + edit.size() != 0 && replicating.get()) { + // Only set the clusterId if is a local key. + // This ensures that the originator sets the cluster id + // and all replicas retain the initial cluster id. + // This is *only* place where a cluster id other than the default is set. + if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) { + logKey.setClusterId(this.clusterId); + } + currentNbOperations += countDistinctRowKeys(edit); + currentNbEntries++; + } else { + this.metrics.logEditsFilteredRate.inc(1); + } } // Stop if too many entries or too big if ((this.reader.getPosition() - this.position) @@ -523,13 +551,12 @@ public class ReplicationSource extends Thread protected void removeNonReplicableEdits(WALEdit edit) { NavigableMap scopes = edit.getScopes(); List kvs = edit.getKeyValues(); - for (int i = 0; i < edit.size(); i++) { + for (int i = edit.size()-1; i >= 0; i--) { KeyValue kv = kvs.get(i); // The scope will be null or empty if // there's nothing to replicate in that WALEdit if (scopes == null || !scopes.containsKey(kv.getFamily())) { kvs.remove(i); - i--; } } } @@ -706,7 +733,7 @@ public class ReplicationSource extends Thread } public String getPeerClusterId() { - return this.peerClusterId; + return this.peerId; } public Path getCurrentPath() { diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 9dbdbbc4d0b..664ee9bddb4 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -765,7 +765,7 @@ public class TestHRegion extends HBaseTestCase { try { Map> deleteMap = new HashMap>(); deleteMap.put(family, kvs); - region.delete(deleteMap, true); + region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); } catch (Exception e) { assertTrue("Family " +new String(family)+ " does not exist", false); } @@ -776,7 +776,7 @@ public class TestHRegion extends HBaseTestCase { try { Map> deleteMap = new HashMap>(); deleteMap.put(family, kvs); - region.delete(deleteMap, true); + region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); } catch (Exception e) { ok = true; } @@ -1042,7 +1042,7 @@ public class TestHRegion extends HBaseTestCase { Map> deleteMap = new HashMap>(); deleteMap.put(fam1, kvs); - region.delete(deleteMap, true); + region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, true); // extract the key values out the memstore: // This is kinda hacky, but better than nothing... diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java index 50d297bc1b4..9e29f303db2 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; @@ -156,10 +157,11 @@ public class TestHLogMethods { private HLog.Entry createTestLogEntry(int i) { long seq = i; long now = i * 1000; - + WALEdit edit = new WALEdit(); edit.add(KeyValueTestUtil.create("row", "fam", "qual", 1234, "val")); - HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now); + HLogKey key = new HLogKey(TEST_REGION, TEST_TABLE, seq, now, + HConstants.DEFAULT_CLUSTER_ID); HLog.Entry entry = new HLog.Entry(key, edit); return entry; } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index de28418efea..f2609969fe4 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -162,7 +162,8 @@ public class TestHLogSplit { fs.mkdirs(regiondir); long now = System.currentTimeMillis(); HLog.Entry entry = - new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now), + new HLog.Entry(new HLogKey(encoded, + HConstants.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true); String parentOfParent = p.getParent().getParent().getName(); @@ -1179,7 +1180,8 @@ public class TestHLogSplit { WALEdit edit = new WALEdit(); seq++; edit.add(new KeyValue(row, family, qualifier, time, KeyValue.Type.Put, value)); - return new HLog.Entry(new HLogKey(region, table, seq, time), edit); + return new HLog.Entry(new HLogKey(region, table, seq, time, + HConstants.DEFAULT_CLUSTER_ID), edit); } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index dc43eb217e8..7691236974c 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -95,7 +95,7 @@ public class TestWALActionsListener { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(b)); - HLogKey key = new HLogKey(b,b, 0, 0); + HLogKey key = new HLogKey(b,b, 0, 0, HConstants.DEFAULT_CLUSTER_ID); hlog.append(hri, key, edit, htd); if (i == 10) { hlog.registerWALActionsListener(laterobserver); @@ -148,4 +148,4 @@ public class TestWALActionsListener { } } -} \ No newline at end of file +} diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java new file mode 100644 index 00000000000..7e3972daada --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -0,0 +1,329 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestMasterReplication { + + private static final Log LOG = LogFactory.getLog(TestReplication.class); + + private static Configuration conf1; + private static Configuration conf2; + private static Configuration conf3; + + private static String clusterKey1; + private static String clusterKey2; + private static String clusterKey3; + + private static HBaseTestingUtility utility1; + private static HBaseTestingUtility utility2; + private static HBaseTestingUtility utility3; + private static final long SLEEP_TIME = 500; + private static final int NB_RETRIES = 10; + + private static final byte[] tableName = Bytes.toBytes("test"); + private static final byte[] famName = Bytes.toBytes("f"); + private static final byte[] row = Bytes.toBytes("row"); + private static final byte[] row1 = Bytes.toBytes("row1"); + private static final byte[] row2 = Bytes.toBytes("row2"); + private static final byte[] noRepfamName = Bytes.toBytes("norep"); + + private static final byte[] count = Bytes.toBytes("count"); + private static final byte[] put = Bytes.toBytes("put"); + private static final byte[] delete = Bytes.toBytes("delete"); + + private static HTableDescriptor table; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + // smaller block size and capacity to trigger more operations + // and test them + conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20); + conf1.setInt("replication.source.size.capacity", 1024); + conf1.setLong("replication.source.sleepforretries", 100); + conf1.setInt("hbase.regionserver.maxlogs", 10); + conf1.setLong("hbase.master.logcleaner.ttl", 10); + conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true); + conf1.setBoolean("dfs.support.append", true); + conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); + conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter"); + + utility1 = new HBaseTestingUtility(conf1); + utility1.startMiniZKCluster(); + MiniZooKeeperCluster miniZK = utility1.getZkCluster(); + new ZooKeeperWatcher(conf1, "cluster1", null, true); + + conf2 = new Configuration(conf1); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + + conf3 = new Configuration(conf1); + conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3"); + + utility2 = new HBaseTestingUtility(conf2); + utility2.setZkCluster(miniZK); + new ZooKeeperWatcher(conf2, "cluster3", null, true); + + utility3 = new HBaseTestingUtility(conf3); + utility3.setZkCluster(miniZK); + new ZooKeeperWatcher(conf3, "cluster3", null, true); + + clusterKey1 = conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf1.get("hbase.zookeeper.property.clientPort")+":/1"; + + clusterKey2 = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf2.get("hbase.zookeeper.property.clientPort")+":/2"; + + clusterKey3 = conf3.get(HConstants.ZOOKEEPER_QUORUM)+":" + + conf3.get("hbase.zookeeper.property.clientPort")+":/3"; + + table = new HTableDescriptor(tableName); + HColumnDescriptor fam = new HColumnDescriptor(famName); + fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); + table.addFamily(fam); + fam = new HColumnDescriptor(noRepfamName); + table.addFamily(fam); + } + + @Test(timeout=300000) + public void testCyclicReplication() throws Exception { + LOG.info("testCyclicReplication"); + utility1.startMiniCluster(); + utility2.startMiniCluster(); + utility3.startMiniCluster(); + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + ReplicationAdmin admin2 = new ReplicationAdmin(conf2); + ReplicationAdmin admin3 = new ReplicationAdmin(conf3); + + new HBaseAdmin(conf1).createTable(table); + new HBaseAdmin(conf2).createTable(table); + new HBaseAdmin(conf3).createTable(table); + HTable htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + HTable htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + HTable htable3 = new HTable(conf3, tableName); + htable3.setWriteBufferSize(1024); + + admin1.addPeer("1", clusterKey2); + admin2.addPeer("1", clusterKey3); + admin3.addPeer("1", clusterKey1); + + // put "row" and wait 'til it got around + putAndWait(row, famName, htable1, htable3); + // it should have passed through table2 + check(row,famName,htable2); + + putAndWait(row1, famName, htable2, htable1); + check(row,famName,htable3); + putAndWait(row2, famName, htable3, htable2); + check(row,famName,htable1); + + deleteAndWait(row,htable1,htable3); + deleteAndWait(row1,htable2,htable1); + deleteAndWait(row2,htable3,htable2); + + assertEquals("Puts were replicated back ", 3, getCount(htable1, put)); + assertEquals("Puts were replicated back ", 3, getCount(htable2, put)); + assertEquals("Puts were replicated back ", 3, getCount(htable3, put)); + assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete)); + assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete)); + assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete)); + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + /** + * Add a row to a table in each cluster, check it's replicated, + * delete it, check's gone + * Also check the puts and deletes are not replicated back to + * the originating cluster. + */ + @Test(timeout=300000) + public void testSimplePutDelete() throws Exception { + LOG.info("testSimplePutDelete"); + utility1.startMiniCluster(); + utility2.startMiniCluster(); + + ReplicationAdmin admin1 = new ReplicationAdmin(conf1); + ReplicationAdmin admin2 = new ReplicationAdmin(conf2); + + new HBaseAdmin(conf1).createTable(table); + new HBaseAdmin(conf2).createTable(table); + HTable htable1 = new HTable(conf1, tableName); + htable1.setWriteBufferSize(1024); + HTable htable2 = new HTable(conf2, tableName); + htable2.setWriteBufferSize(1024); + + // set M-M + admin1.addPeer("1", clusterKey2); + admin2.addPeer("1", clusterKey1); + + // add rows to both clusters, + // make sure they are both replication + putAndWait(row, famName, htable1, htable2); + putAndWait(row1, famName, htable2, htable1); + + // make sure "row" did not get replicated back. + assertEquals("Puts were replicated back ", 2, getCount(htable1, put)); + + // delete "row" and wait + deleteAndWait(row, htable1, htable2); + + // make the 2nd cluster replicated back + assertEquals("Puts were replicated back ", 2, getCount(htable2, put)); + + deleteAndWait(row1, htable2, htable1); + + assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete)); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } + + private int getCount(HTable t, byte[] type) throws IOException { + Get test = new Get(row); + test.setAttribute("count", new byte[]{}); + Result res = t.get(test); + return Bytes.toInt(res.getValue(count, type)); + } + + private void deleteAndWait(byte[] row, HTable source, HTable target) + throws Exception { + Delete del = new Delete(row); + source.delete(del); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for del replication"); + } + Result res = target.get(get); + if (res.size() >= 1) { + LOG.info("Row not deleted"); + Thread.sleep(SLEEP_TIME); + } else { + break; + } + } + } + + private void check(byte[] row, byte[] fam, HTable t) throws IOException { + Get get = new Get(row); + Result res = t.get(get); + if (res.size() == 0) { + fail("Row is missing"); + } + } + + private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target) + throws Exception { + Put put = new Put(row); + put.add(fam, row, row); + source.put(put); + + Get get = new Get(row); + for (int i = 0; i < NB_RETRIES; i++) { + if (i==NB_RETRIES-1) { + fail("Waited too much time for put replication"); + } + Result res = target.get(get); + if (res.size() == 0) { + LOG.info("Row not available"); + Thread.sleep(SLEEP_TIME); + } else { + assertArrayEquals(res.value(), row); + break; + } + } + } + + /** + * Use a coprocessor to count puts and deletes. + * as KVs would be replicated back with the same timestamp + * there is otherwise no way to count them. + */ + public static class CoprocessorCounter extends BaseRegionObserver { + private int nCount = 0; + private int nDelete = 0; + + @Override + public void prePut(final ObserverContext e, + final Map> familyMap, final boolean writeToWAL) + throws IOException { + nCount++; + } + @Override + public void postDelete(final ObserverContext c, + final Map> familyMap, final boolean writeToWAL) + throws IOException { + nDelete++; + } + @Override + public void preGet(final ObserverContext c, + final Get get, final List result) throws IOException { + if (get.getAttribute("count") != null) { + result.clear(); + // order is important! + result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete))); + result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount))); + c.bypass(); + } + } + } +} diff --git a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index f019c93e75a..8a6f8989b08 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -78,7 +78,7 @@ public class TestReplicationSource { KeyValue kv = new KeyValue(b,b,b); WALEdit edit = new WALEdit(); edit.add(kv); - HLogKey key = new HLogKey(b, b, 0, 0); + HLogKey key = new HLogKey(b, b, 0, 0, HConstants.DEFAULT_CLUSTER_ID); writer.append(new HLog.Entry(key, edit)); writer.sync(); } diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index d54adac07a4..d3394c0b19f 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -244,11 +244,12 @@ public class TestReplicationSink { now, KeyValue.Type.DeleteFamily); } - HLogKey key = new HLogKey(table, table, now, now); + HLogKey key = new HLogKey(table, table, now, now, + HConstants.DEFAULT_CLUSTER_ID); WALEdit edit = new WALEdit(); edit.add(kv); return new HLog.Entry(key, edit); } -} \ No newline at end of file +} diff --git a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 381ac904962..14e33a77a28 100644 --- a/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -170,8 +170,8 @@ public class TestReplicationSourceManager { hlog.rollWriter(); } LOG.info(i); - HLogKey key = new HLogKey(hri.getRegionName(), - test, seq++, System.currentTimeMillis()); + HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, + System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); hlog.append(hri, key, edit, htd); } @@ -183,8 +183,8 @@ public class TestReplicationSourceManager { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - HLogKey key = new HLogKey(hri.getRegionName(), - test, seq++, System.currentTimeMillis()); + HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, + System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); hlog.append(hri, key, edit, htd); } @@ -195,8 +195,8 @@ public class TestReplicationSourceManager { manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false); - HLogKey key = new HLogKey(hri.getRegionName(), - test, seq++, System.currentTimeMillis()); + HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, + System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); hlog.append(hri, key, edit, htd); assertEquals(1, manager.getHLogs().size());