HBASE-7709 Infinite loop possible in Master/Master replication

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518335 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-08-28 19:32:00 +00:00
parent 39f7dc4ca3
commit 9ebaea9f54
19 changed files with 1211 additions and 440 deletions

View File

@ -39,6 +39,10 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class Mutation extends OperationWithAttributes implements Row, CellScannable,
@ -57,8 +61,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
// familyMap
ClassSize.TREEMAP);
// Attribute used in Mutations to indicate the originating cluster.
private static final String CLUSTER_ID_ATTR = "_c.id_";
/**
* The attribute for storing the list of clusters that have consumed the change.
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
@ -225,26 +231,33 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
* Set the replication custer id.
* @param clusterId
* Marks that the clusters with the given clusterIds have consumed the mutation
* @param clusterIds of the clusters that have consumed the mutation
*/
public void setClusterId(UUID clusterId) {
if (clusterId == null) return;
byte[] val = new byte[2*Bytes.SIZEOF_LONG];
Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
setAttribute(CLUSTER_ID_ATTR, val);
public void setClusterIds(List<UUID> clusterIds) {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
out.writeInt(clusterIds.size());
for (UUID clusterId : clusterIds) {
out.writeLong(clusterId.getMostSignificantBits());
out.writeLong(clusterId.getLeastSignificantBits());
}
setAttribute(CONSUMED_CLUSTER_IDS, out.toByteArray());
}
/**
* @return The replication cluster id.
* @return the set of clusterIds that have consumed the mutation
*/
public UUID getClusterId() {
byte[] attr = getAttribute(CLUSTER_ID_ATTR);
if (attr == null) {
return HConstants.DEFAULT_CLUSTER_ID;
public List<UUID> getClusterIds() {
List<UUID> clusterIds = new ArrayList<UUID>();
byte[] bytes = getAttribute(CONSUMED_CLUSTER_IDS);
if(bytes != null) {
ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
int numClusters = in.readInt();
for(int i=0; i<numClusters; i++){
clusterIds.add(new UUID(in.readLong(), in.readLong()));
}
}
return new UUID(Bytes.toLong(attr,0), Bytes.toLong(attr, Bytes.SIZEOF_LONG));
return clusterIds;
}
/**

View File

@ -33,13 +33,24 @@ message WALKey {
required bytes table_name = 2;
required uint64 log_sequence_number = 3;
required uint64 write_time = 4;
optional UUID cluster_id = 5;
/*
This parameter is deprecated in favor of clusters which
contains the list of clusters that have consumed the change.
It is retained so that the log created by earlier releases (0.94)
can be read by the newer releases.
*/
optional UUID cluster_id = 5 [deprecated=true];
repeated FamilyScope scopes = 6;
optional uint32 following_kv_count = 7;
/*
This field contains the list of clusters that have
consumed the change
*/
repeated UUID cluster_ids = 8;
/*
optional CustomEntryType custom_entry_type = 8;
optional CustomEntryType custom_entry_type = 9;
enum CustomEntryType {
COMPACTION = 0;
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -114,7 +115,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
private UUID clusterId;
private List<UUID> clusterIds;
/**
* @param row The current table row key.
@ -159,11 +160,11 @@ public class Import {
}
}
if (put != null) {
put.setClusterId(clusterId);
put.setClusterIds(clusterIds);
context.write(key, put);
}
if (delete != null) {
delete.setClusterId(clusterId);
delete.setClusterIds(clusterIds);
context.write(key, delete);
}
}
@ -177,7 +178,7 @@ public class Import {
ZooKeeperWatcher zkw = null;
try {
zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
clusterId = ZKClusterId.getUUIDForCluster(zkw);
clusterIds = Collections.singletonList(ZKClusterId.getUUIDForCluster(zkw));
} catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) {

View File

@ -115,6 +115,7 @@ public class ReplicationProtbufUtil {
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
AdminProtos.ReplicateWALEntryRequest.Builder builder =
AdminProtos.ReplicateWALEntryRequest.newBuilder();
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (HLog.Entry entry: entries) {
entryBuilder.clear();
WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
@ -124,11 +125,10 @@ public class ReplicationProtbufUtil {
keyBuilder.setTableName(ByteString.copyFrom(key.getTablename().getName()));
keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
keyBuilder.setWriteTime(key.getWriteTime());
UUID clusterId = key.getClusterId();
if (clusterId != null) {
HBaseProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder();
for(UUID clusterId : key.getClusterIds()) {
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
keyBuilder.addClusterIds(uuidBuilder.build());
}
WALEdit edit = entry.getEdit();
NavigableMap<byte[], Integer> scopes = key.getScopes();

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -41,8 +42,8 @@ implements RowProcessor<S,T> {
}
@Override
public UUID getClusterId() {
return HConstants.DEFAULT_CLUSTER_ID;
public List<UUID> getClusterIds() {
return new ArrayList<UUID>();
}
@Override

View File

@ -1779,15 +1779,13 @@ public class HRegion implements HeapSize { // , Writable{
/**
* This is used only by unit tests. Not required to be a public API.
* @param familyMap map of family to edits for the given family.
* @param clusterId
* @param durability
* @throws IOException
*/
void delete(NavigableMap<byte[], List<Cell>> familyMap, UUID clusterId,
void delete(NavigableMap<byte[], List<Cell>> familyMap,
Durability durability) throws IOException {
Delete delete = new Delete(FOR_UNIT_TESTS_ONLY);
delete.setFamilyMap(familyMap);
delete.setClusterId(clusterId);
delete.setDurability(durability);
doBatchMutate(delete);
}
@ -2206,7 +2204,7 @@ public class HRegion implements HeapSize { // , Writable{
Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdit, mutation.getClusterId(), now, this.htableDescriptor);
walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
}
// -------------------------------
@ -2598,7 +2596,6 @@ public class HRegion implements HeapSize { // , Writable{
familyMap.put(family, edits);
Put p = new Put(row);
p.setFamilyMap(familyMap);
p.setClusterId(HConstants.DEFAULT_CLUSTER_ID);
doBatchMutate(p);
}
@ -4534,7 +4531,7 @@ public class HRegion implements HeapSize { // , Writable{
if (!walEdit.isEmpty()) {
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdit,
processor.getClusterId(), now, this.htableDescriptor);
processor.getClusterIds(), now, this.htableDescriptor);
}
// 8. Release region lock
if (locked) {
@ -4761,7 +4758,7 @@ public class HRegion implements HeapSize { // , Writable{
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
@ -4911,7 +4908,7 @@ public class HRegion implements HeapSize { // , Writable{
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@InterfaceAudience.Public
@ -107,9 +106,9 @@ public interface RowProcessor<S extends Message, T extends Message> {
/**
* @return The replication cluster id.
* @return The cluster ids that have the change.
*/
UUID getClusterId();
List<UUID> getClusterIds();
/**
* Human readable name of the processor

View File

@ -27,6 +27,7 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -821,12 +822,12 @@ class FSHLog implements HLog, Syncable {
* @param encodedRegionName Encoded name of the region as returned by
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
* @param tableName
* @param clusterId
* @param clusterIds that have consumed the change
* @return New log key.
*/
protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
long now, UUID clusterId) {
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterId);
long now, List<UUID> clusterIds) {
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds);
}
@Override
@ -839,7 +840,7 @@ class FSHLog implements HLog, Syncable {
@Override
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore);
append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
}
/**
@ -862,15 +863,16 @@ class FSHLog implements HLog, Syncable {
* @param info
* @param tableName
* @param edits
* @param clusterId The originating clusterId for this edit (for replication)
* @param clusterIds that have consumed the change (for replication)
* @param now
* @param doSync shall we sync?
* @return txid of this transaction
* @throws IOException
*/
@SuppressWarnings("deprecation")
private long append(HRegionInfo info, TableName tableName, WALEdit edits, UUID clusterId,
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
private long append(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd, boolean doSync,
boolean isInMemstore)
throws IOException {
if (edits.isEmpty()) return this.unflushedEntries.get();
if (this.closed) {
@ -890,7 +892,7 @@ class FSHLog implements HLog, Syncable {
// actual name.
byte [] encodedRegionName = info.getEncodedNameAsBytes();
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterIds);
doWrite(info, logKey, edits, htd);
this.numEntries.incrementAndGet();
txid = this.unflushedEntries.incrementAndGet();
@ -914,9 +916,9 @@ class FSHLog implements HLog, Syncable {
@Override
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd)
List<UUID> clusterIds, final long now, HTableDescriptor htd)
throws IOException {
return append(info, tableName, edits, clusterId, now, htd, false, true);
return append(info, tableName, edits, clusterIds, now, htd, false, true);
}
/**

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
@ -264,7 +266,7 @@ public interface HLog {
void closeAndDelete() throws IOException;
/**
* Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, UUID, long, HTableDescriptor)},
* Same as {@link #appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor)},
* except it causes a sync on the log
*/
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
@ -285,23 +287,20 @@ public interface HLog {
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException;
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
* regionName, rowname, and log-sequence-id. The HLog is not flushed after
* this transaction is written to the log.
*
* Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
* log-sequence-id. The HLog is not flushed after this transaction is written to the log.
* @param info
* @param tableName
* @param edits
* @param clusterId
* The originating clusterId for this edit (for replication)
* @param clusterIds The clusters that have consumed the change (for replication)
* @param now
* @param htd
* @return txid of this transaction
* @throws IOException
*/
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
UUID clusterId, final long now, HTableDescriptor htd) throws IOException;
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
void hsync() throws IOException;
void hflush() throws IOException;

View File

@ -22,7 +22,11 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
@ -93,6 +97,13 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
}
/*
* This is used for reading the log entries created by the previous releases
* (0.94.11) which write the clusters information to the scopes of WALEdit.
*/
private static final String PREFIX_CLUSTER_KEY = ".";
private static final Version VERSION = Version.COMPRESSED;
// The encoded region name.
@ -102,15 +113,23 @@ public class HLogKey implements WritableComparable<HLogKey> {
// Time at which this edit was written.
private long writeTime;
private UUID clusterId;
// The first element in the list is the cluster id on which the change has originated
private List<UUID> clusterIds;
private NavigableMap<byte[], Integer> scopes;
private CompressionContext compressionContext;
public HLogKey() {
this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
HConstants.DEFAULT_CLUSTER_ID);
init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
new ArrayList<UUID>());
}
public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
final long now, UUID clusterId) {
List<UUID> clusterIds = new ArrayList<UUID>();
clusterIds.add(clusterId);
init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
}
/**
@ -123,13 +142,18 @@ public class HLogKey implements WritableComparable<HLogKey> {
* @param tablename - name of table
* @param logSeqNum - log sequence number
* @param now Time at which this edit was written.
* @param clusterId of the cluster (used in Replication)
* @param clusterIds the clusters that have consumed the change(used in Replication)
*/
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
long logSeqNum, final long now, UUID clusterId) {
long logSeqNum, final long now, List<UUID> clusterIds){
init(encodedRegionName, tablename, logSeqNum, now, clusterIds);
}
protected void init(final byte [] encodedRegionName, final TableName tablename,
long logSeqNum, final long now, List<UUID> clusterIds) {
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = clusterId;
this.clusterIds = clusterIds;
this.encodedRegionName = encodedRegionName;
this.tablename = tablename;
}
@ -171,14 +195,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
return this.writeTime;
}
/**
* Get the id of the original cluster
* @return Cluster id.
*/
public UUID getClusterId() {
return clusterId;
}
public NavigableMap<byte[], Integer> getScopes() {
return scopes;
}
@ -187,12 +203,47 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.scopes = scopes;
}
public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
if (scopes != null) {
Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
.iterator();
while (iterator.hasNext()) {
Map.Entry<byte[], Integer> scope = iterator.next();
String key = Bytes.toString(scope.getKey());
if (key.startsWith(PREFIX_CLUSTER_KEY)) {
addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
.length())));
iterator.remove();
}
}
if (scopes.size() > 0) {
this.scopes = scopes;
}
}
}
/**
* Set the cluster id of this key.
* @param clusterId
* Marks that the cluster with the given clusterId has consumed the change
*/
public void setClusterId(UUID clusterId) {
this.clusterId = clusterId;
public void addClusterId(UUID clusterId) {
if (!clusterIds.contains(clusterId)) {
clusterIds.add(clusterId);
}
}
/**
* @return the set of cluster Ids that have consumed the change
*/
public List<UUID> getClusterIds() {
return clusterIds;
}
/**
* @return the cluster id on which the change has originated. It there is no such cluster, it
* returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
*/
public UUID getOriginatingClusterId(){
return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
}
@Override
@ -232,7 +283,6 @@ public class HLogKey implements WritableComparable<HLogKey> {
int result = Bytes.hashCode(this.encodedRegionName);
result ^= this.logSeqNum;
result ^= this.writeTime;
result ^= this.clusterId.hashCode();
return result;
}
@ -299,13 +349,16 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
// avoid storing 16 bytes when replication is not enabled
if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) {
out.writeBoolean(false);
} else {
// Don't need to write the clusters information as we are using protobufs from 0.95
// Writing only the first clusterId for testing the legacy read
Iterator<UUID> iterator = clusterIds.iterator();
if(iterator.hasNext()){
out.writeBoolean(true);
out.writeLong(this.clusterId.getMostSignificantBits());
out.writeLong(this.clusterId.getLeastSignificantBits());
UUID clusterId = iterator.next();
out.writeLong(clusterId.getMostSignificantBits());
out.writeLong(clusterId.getLeastSignificantBits());
} else {
out.writeBoolean(false);
}
}
@ -344,10 +397,13 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
this.clusterIds.clear();
if (version.atLeast(Version.INITIAL)) {
if (in.readBoolean()) {
this.clusterId = new UUID(in.readLong(), in.readLong());
// read the older log
// Definitely is the originating cluster
clusterIds.add(new UUID(in.readLong(), in.readLong()));
}
} else {
try {
@ -357,6 +413,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
// Means it's a very old key, just continue
}
}
// Do not need to read the clusters information as we are using protobufs from 0.95
}
public WALKey.Builder getBuilder(
@ -373,10 +430,11 @@ public class HLogKey implements WritableComparable<HLogKey> {
}
builder.setLogSequenceNumber(this.logSeqNum);
builder.setWriteTime(writeTime);
if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) {
builder.setClusterId(HBaseProtos.UUID.newBuilder()
.setLeastSigBits(this.clusterId.getLeastSignificantBits())
.setMostSigBits(this.clusterId.getMostSignificantBits()));
HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
for (UUID clusterId : clusterIds) {
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
builder.addClusterIds(uuidBuilder.build());
}
if (scopes != null) {
for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
@ -401,10 +459,15 @@ public class HLogKey implements WritableComparable<HLogKey> {
this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
}
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
clusterIds.clear();
if (walKey.hasClusterId()) {
this.clusterId = new UUID(
walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits());
//When we are reading the older log (0.95.1 release)
//This is definitely the originating cluster
clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
.getLeastSigBits()));
}
for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
}
this.scopes = null;
if (walKey.getScopesCount() > 0) {

View File

@ -22,8 +22,6 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
@ -37,7 +35,6 @@ import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
@ -1484,11 +1481,11 @@ public class HLogSplitter {
if (kv.isDelete()) {
del = new Delete(kv.getRow());
del.setClusterId(entry.getKey().getClusterId());
del.setClusterIds(entry.getKey().getClusterIds());
preRow = del;
} else {
put = new Put(kv.getRow());
put.setClusterId(entry.getKey().getClusterId());
put.setClusterIds(entry.getKey().getClusterIds());
preRow = put;
}
preKey = loc.getHostnamePort() + KEY_DELIMITER + table;

View File

@ -217,7 +217,7 @@ public class SequenceFileLogReader extends ReaderBase {
// Scopes are probably in WAL edit, move to key
NavigableMap<byte[], Integer> scopes = e.getEdit().getAndRemoveScopes();
if (scopes != null) {
e.getKey().setScopes(scopes);
e.getKey().readOlderScopes(scopes);
}
return true;
} catch (IOException ioe) {

View File

@ -116,13 +116,13 @@ public class ReplicationSink {
long totalReplicated = 0;
// Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
// invocation of this method per table and cluster id.
Map<TableName, Map<UUID,List<Row>>> rowMap = new TreeMap<TableName, Map<UUID,List<Row>>>();
Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
Cell previousCell = null;
Mutation m = null;
java.util.UUID uuid = toUUID(entry.getKey().getClusterId());
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
@ -135,8 +135,12 @@ public class ReplicationSink {
m = CellUtil.isDelete(cell)?
new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
m.setClusterId(uuid);
addToHashMultiMap(rowMap, table, uuid, m);
List<UUID> clusterIds = new ArrayList<UUID>();
for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
clusterIds.add(toUUID(clusterId));
}
m.setClusterIds(clusterIds);
addToHashMultiMap(rowMap, table, clusterIds, m);
}
if (CellUtil.isDelete(cell)) {
((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
@ -147,7 +151,7 @@ public class ReplicationSink {
}
totalReplicated++;
}
for (Entry<TableName, Map<UUID,List<Row>>> entry : rowMap.entrySet()) {
for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values());
}
int size = entries.size();
@ -181,7 +185,7 @@ public class ReplicationSink {
* @param key1
* @param key2
* @param value
* @return
* @return the list of values corresponding to key1 and key2
*/
private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
Map<K2,List<V>> innerMap = map.get(key1);

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
@ -55,10 +54,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
/**
* Class that handles the source of a replication stream.
@ -395,20 +392,15 @@ public class ReplicationSource extends Thread
seenEntries++;
// Remove all KVs that should not be replicated
HLogKey logKey = entry.getKey();
// don't replicate if the log entries originated in the peer
if (!logKey.getClusterId().equals(peerClusterId)) {
// don't replicate if the log entries have already been consumed by the cluster
if (!logKey.getClusterIds().contains(peerClusterId)) {
removeNonReplicableEdits(entry);
// Don't replicate catalog entries, if the WALEdit wasn't
// containing anything to replicate and if we're currently not set to replicate
if (!logKey.getTablename().equals(TableName.META_TABLE_NAME) &&
edit.size() != 0) {
// Only set the clusterId if is a local key.
// This ensures that the originator sets the cluster id
// and all replicas retain the initial cluster id.
// This is *only* place where a cluster id other than the default is set.
if (HConstants.DEFAULT_CLUSTER_ID == logKey.getClusterId()) {
logKey.setClusterId(this.clusterId);
}
//Mark that the current cluster has the change
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
currentNbEntries++;
currentSize += entry.getEdit().size();
@ -817,4 +809,4 @@ public class ReplicationSource extends Thread
", currently replicating from: " + this.currentPath +
" at position: " + position;
}
}
}

View File

@ -172,7 +172,7 @@ class SnapshotLogSplitter implements Closeable {
// Append Entry
key = new HLogKey(newRegionName, tableName,
key.getLogSeqNum(), key.getWriteTime(), key.getClusterId());
key.getLogSeqNum(), key.getWriteTime(), key.getClusterIds());
writer.append(new HLog.Entry(key, entry.getEdit()));
}
} catch (IOException e) {

View File

@ -1231,7 +1231,7 @@ public class TestHRegion extends HBaseTestCase {
NavigableMap<byte[], List<Cell>> deleteMap =
new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
region.delete(deleteMap, Durability.SYNC_WAL);
} catch (Exception e) {
assertTrue("Family " +new String(family)+ " does not exist", false);
}
@ -1243,7 +1243,7 @@ public class TestHRegion extends HBaseTestCase {
NavigableMap<byte[], List<Cell>> deleteMap =
new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(family, kvs);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
region.delete(deleteMap, Durability.SYNC_WAL);
} catch (Exception e) {
ok = true;
}
@ -1571,7 +1571,7 @@ public class TestHRegion extends HBaseTestCase {
NavigableMap<byte[], List<Cell>> deleteMap =
new TreeMap<byte[], List<Cell>>(Bytes.BYTES_COMPARATOR);
deleteMap.put(fam1, kvs);
region.delete(deleteMap, HConstants.DEFAULT_CLUSTER_ID, Durability.SYNC_WAL);
region.delete(deleteMap, Durability.SYNC_WAL);
// extract the key values out the memstore:
// This is kinda hacky, but better than nothing...
@ -3853,7 +3853,7 @@ public class TestHRegion extends HBaseTestCase {
//verify append called or not
verify(log, expectAppend ? times(1) : never())
.appendNoSync((HRegionInfo)any(), eq(tableName),
(WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any());
(WALEdit)any(), (List<UUID>)any(), anyLong(), (HTableDescriptor)any());
//verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@ -103,7 +104,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
HRegionInfo hri = region.getRegionInfo();
if (this.noSync) {
hlog.appendNoSync(hri, hri.getTableName(), walEdit,
HConstants.DEFAULT_CLUSTER_ID, now, htd);
new ArrayList<UUID>(), now, htd);
} else {
hlog.append(hri, hri.getTableName(), walEdit, now, htd);
}

View File

@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -22,27 +21,35 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
@ -55,18 +62,14 @@ public class TestMasterReplication {
private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
private Configuration conf1;
private Configuration conf2;
private Configuration conf3;
private Configuration baseConfiguration;
private HBaseTestingUtility utility1;
private HBaseTestingUtility utility2;
private HBaseTestingUtility utility3;
private MiniZooKeeperCluster miniZK;
private HBaseTestingUtility[] utilities;
private Configuration[] configurations;
private MiniZooKeeperCluster miniZK;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 100;
private static final int NB_RETRIES = 10;
private static final byte[] tableName = Bytes.toBytes("test");
private static final byte[] famName = Bytes.toBytes("f");
@ -85,44 +88,21 @@ public class TestMasterReplication {
@Before
public void setUp() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
baseConfiguration = HBaseConfiguration.create();
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
conf1.setInt("replication.source.size.capacity", 1024);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
baseConfiguration.setInt("replication.source.size.capacity", 1024);
baseConfiguration.setLong("replication.source.sleepforretries", 100);
baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
baseConfiguration.setBoolean("dfs.support.append", true);
baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
baseConfiguration.setStrings(
CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
CoprocessorCounter.class.getName());
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
miniZK = utility1.getZkCluster();
// By setting the mini ZK cluster through this method, even though this is
// already utility1's mini ZK cluster, we are telling utility1 not to shut
// the mini ZK cluster when we shut down the HBase cluster.
utility1.setZkCluster(miniZK);
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZooKeeperWatcher(conf2, "cluster2", null, true);
conf3 = new Configuration(conf1);
conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
utility3 = new HBaseTestingUtility(conf3);
utility3.setZkCluster(miniZK);
new ZooKeeperWatcher(conf3, "cluster3", null, true);
table = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
@ -131,209 +111,325 @@ public class TestMasterReplication {
table.addFamily(fam);
}
@After
public void tearDown() throws IOException {
miniZK.shutdown();
}
/**
* It tests the replication scenario involving 0 -> 1 -> 0. It does it by
* adding and deleting a row to a table in each cluster, checking if it's
* replicated. It also tests that the puts and deletes are not replicated back
* to the originating cluster.
*/
@Test(timeout = 300000)
public void testCyclicReplication1() throws Exception {
LOG.info("testSimplePutDelete");
int numClusters = 2;
HTable[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
@Test(timeout=300000)
public void testCyclicReplication() throws Exception {
LOG.info("testCyclicReplication");
utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
ReplicationAdmin admin3 = new ReplicationAdmin(conf3);
htables = getHTablesOnClusters(tableName);
new HBaseAdmin(conf1).createTable(table);
new HBaseAdmin(conf2).createTable(table);
new HBaseAdmin(conf3).createTable(table);
HTable htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
HTable htable2 = new HTable(conf2, tableName);
htable2.setWriteBufferSize(1024);
HTable htable3 = new HTable(conf3, tableName);
htable3.setWriteBufferSize(1024);
admin1.addPeer("1", utility2.getClusterKey());
admin2.addPeer("1", utility3.getClusterKey());
admin3.addPeer("1", utility1.getClusterKey());
// Test the replication scenarios of 0 -> 1 -> 0
addPeer("1", 0, 1);
addPeer("1", 1, 0);
// put "row" and wait 'til it got around
putAndWait(row, famName, htable1, htable3);
// it should have passed through table2
check(row,famName,htable2);
int[] expectedCounts = new int[] { 2, 2 };
putAndWait(row1, famName, htable2, htable1);
check(row,famName,htable3);
putAndWait(row2, famName, htable3, htable2);
check(row,famName,htable1);
deleteAndWait(row,htable1,htable3);
deleteAndWait(row1,htable2,htable1);
deleteAndWait(row2,htable3,htable2);
// add rows to both clusters,
// make sure they are both replication
putAndWait(row, famName, htables[0], htables[1]);
putAndWait(row1, famName, htables[1], htables[0]);
validateCounts(htables, put, expectedCounts);
assertEquals("Puts were replicated back ", 3, getCount(htable1, put));
assertEquals("Puts were replicated back ", 3, getCount(htable2, put));
assertEquals("Puts were replicated back ", 3, getCount(htable3, put));
assertEquals("Deletes were replicated back ", 3, getCount(htable1, delete));
assertEquals("Deletes were replicated back ", 3, getCount(htable2, delete));
assertEquals("Deletes were replicated back ", 3, getCount(htable3, delete));
// Test HBASE-9158
admin2.disablePeer("1");
// we now have an edit that was replicated into cluster originating from cluster 1
putAndWait(row3, famName, htable1, htable2);
// now add a local edit to cluster 2
Put put = new Put(row4);
put.add(famName, row4, row4);
htable2.put(put);
// reenable replication from cluster 2 to cluster 3
admin2.enablePeer("1");
// without HBASE-9158 the edit for row4 would have been marked with cluster 1's id
// and hence not replicated to cluster 1
wait(row4, htable1);
utility3.shutdownMiniCluster();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
deleteAndWait(row, htables[0], htables[1]);
deleteAndWait(row1, htables[1], htables[0]);
validateCounts(htables, delete, expectedCounts);
} finally {
close(htables);
shutDownMiniClusters();
}
}
/**
* Add a row to a table in each cluster, check it's replicated,
* delete it, check's gone
* Also check the puts and deletes are not replicated back to
* the originating cluster.
* Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
* deleting rows to a table in each clusters and ensuring that the each of
* these clusters get the appropriate mutations. It also tests the grouping
* scenario where a cluster needs to replicate the edits originating from
* itself and also the edits that it received using replication from a
* different cluster. The scenario is explained in HBASE-9158
*/
@Test(timeout=300000)
public void testSimplePutDelete() throws Exception {
LOG.info("testSimplePutDelete");
utility1.startMiniCluster();
utility2.startMiniCluster();
@Test(timeout = 300000)
public void testCyclicReplication2() throws Exception {
LOG.info("testCyclicReplication1");
int numClusters = 3;
HTable[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
// Test the replication scenario of 0 -> 1 -> 2 -> 0
addPeer("1", 0, 1);
addPeer("1", 1, 2);
addPeer("1", 2, 0);
new HBaseAdmin(conf1).createTable(table);
new HBaseAdmin(conf2).createTable(table);
HTable htable1 = new HTable(conf1, tableName);
htable1.setWriteBufferSize(1024);
HTable htable2 = new HTable(conf2, tableName);
htable2.setWriteBufferSize(1024);
htables = getHTablesOnClusters(tableName);
// set M-M
admin1.addPeer("1", utility2.getClusterKey());
admin2.addPeer("1", utility1.getClusterKey());
// put "row" and wait 'til it got around
putAndWait(row, famName, htables[0], htables[2]);
putAndWait(row1, famName, htables[1], htables[0]);
putAndWait(row2, famName, htables[2], htables[1]);
// add rows to both clusters,
// make sure they are both replication
putAndWait(row, famName, htable1, htable2);
putAndWait(row1, famName, htable2, htable1);
deleteAndWait(row, htables[0], htables[2]);
deleteAndWait(row1, htables[1], htables[0]);
deleteAndWait(row2, htables[2], htables[1]);
// make sure "row" did not get replicated back.
assertEquals("Puts were replicated back ", 2, getCount(htable1, put));
int[] expectedCounts = new int[] { 3, 3, 3 };
validateCounts(htables, put, expectedCounts);
validateCounts(htables, delete, expectedCounts);
// delete "row" and wait
deleteAndWait(row, htable1, htable2);
// make the 2nd cluster replicated back
assertEquals("Puts were replicated back ", 2, getCount(htable2, put));
deleteAndWait(row1, htable2, htable1);
assertEquals("Deletes were replicated back ", 2, getCount(htable1, delete));
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
// Test HBASE-9158
disablePeer("1", 2);
// we now have an edit that was replicated into cluster originating from
// cluster 0
putAndWait(row3, famName, htables[0], htables[1]);
// now add a local edit to cluster 1
htables[1].put(new Put(row4).add(famName, row4, row4));
// re-enable replication from cluster 2 to cluster 0
enablePeer("1", 2);
// without HBASE-9158 the edit for row4 would have been marked with
// cluster 0's id
// and hence not replicated to cluster 0
wait(row4, htables[0], true);
} finally {
close(htables);
shutDownMiniClusters();
}
}
private int getCount(HTable t, byte[] type) throws IOException {
/**
* Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
*/
@Test(timeout = 300000)
public void testCyclicReplication3() throws Exception {
LOG.info("testCyclicReplication2");
int numClusters = 3;
HTable[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
// Test the replication scenario of 0 -> 1 -> 2 -> 1
addPeer("1", 0, 1);
addPeer("1", 1, 2);
addPeer("1", 2, 1);
htables = getHTablesOnClusters(tableName);
// put "row" and wait 'til it got around
putAndWait(row, famName, htables[0], htables[2]);
putAndWait(row1, famName, htables[1], htables[2]);
putAndWait(row2, famName, htables[2], htables[1]);
deleteAndWait(row, htables[0], htables[2]);
deleteAndWait(row1, htables[1], htables[2]);
deleteAndWait(row2, htables[2], htables[1]);
int[] expectedCounts = new int[] { 1, 3, 3 };
validateCounts(htables, put, expectedCounts);
validateCounts(htables, delete, expectedCounts);
} finally {
close(htables);
shutDownMiniClusters();
}
}
@After
public void tearDown() throws IOException {
configurations = null;
utilities = null;
}
@SuppressWarnings("resource")
private void startMiniClusters(int numClusters) throws Exception {
Random random = new Random();
utilities = new HBaseTestingUtility[numClusters];
configurations = new Configuration[numClusters];
for (int i = 0; i < numClusters; i++) {
Configuration conf = new Configuration(baseConfiguration);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
HBaseTestingUtility utility = new HBaseTestingUtility(conf);
if (i == 0) {
utility.startMiniZKCluster();
miniZK = utility.getZkCluster();
} else {
utility.setZkCluster(miniZK);
}
utility.startMiniCluster();
utilities[i] = utility;
configurations[i] = conf;
new ZooKeeperWatcher(conf, "cluster" + i, null, true);
}
}
private void shutDownMiniClusters() throws Exception {
int numClusters = utilities.length;
for (int i = numClusters - 1; i >= 0; i--) {
if (utilities[i] != null) {
utilities[i].shutdownMiniCluster();
}
}
miniZK.shutdown();
}
private void createTableOnClusters(HTableDescriptor table) throws Exception {
int numClusters = configurations.length;
for (int i = 0; i < numClusters; i++) {
HBaseAdmin hbaseAdmin = null;
try {
hbaseAdmin = new HBaseAdmin(configurations[i]);
hbaseAdmin.createTable(table);
} finally {
close(hbaseAdmin);
}
}
}
private void addPeer(String id, int masterClusterNumber,
int slaveClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.addPeer(id,
utilities[slaveClusterNumber].getClusterKey());
} finally {
close(replicationAdmin);
}
}
private void disablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.disablePeer(id);
} finally {
close(replicationAdmin);
}
}
private void enablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.enablePeer(id);
} finally {
close(replicationAdmin);
}
}
private void close(Closeable... closeables) {
try {
if (closeables != null) {
for (Closeable closeable : closeables) {
closeable.close();
}
}
} catch (Exception e) {
LOG.warn("Exception occured while closing the object:", e);
}
}
@SuppressWarnings("resource")
private HTable[] getHTablesOnClusters(byte[] tableName) throws Exception {
int numClusters = utilities.length;
HTable[] htables = new HTable[numClusters];
for (int i = 0; i < numClusters; i++) {
HTable htable = new HTable(configurations[i], tableName);
htable.setWriteBufferSize(1024);
htables[i] = htable;
}
return htables;
}
private void validateCounts(HTable[] htables, byte[] type,
int[] expectedCounts) throws IOException {
for (int i = 0; i < htables.length; i++) {
assertEquals(Bytes.toString(type) + " were replicated back ",
expectedCounts[i], getCount(htables[i], type));
}
}
private int getCount(HTable t, byte[] type) throws IOException {
Get test = new Get(row);
test.setAttribute("count", new byte[]{});
test.setAttribute("count", new byte[] {});
Result res = t.get(test);
return Bytes.toInt(res.getValue(count, type));
}
private void deleteAndWait(byte[] row, HTable source, HTable target)
throws Exception {
throws Exception {
Delete del = new Delete(row);
source.delete(del);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
Result res = target.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
}
private void check(byte[] row, byte[] fam, HTable t) throws IOException {
Get get = new Get(row);
Result res = t.get(get);
if (res.size() == 0) {
fail("Row is missing");
}
wait(row, target, true);
}
private void putAndWait(byte[] row, byte[] fam, HTable source, HTable target)
throws Exception {
throws Exception {
Put put = new Put(row);
put.add(fam, row, row);
source.put(put);
wait(row, target);
wait(row, target, false);
}
private void wait(byte[] row, HTable target) throws Exception {
private void wait(byte[] row, HTable target, boolean isDeleted)
throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
if (i == NB_RETRIES - 1) {
fail("Waited too much time for replication. Row:" + Bytes.toString(row)
+ ". IsDeleteReplication:" + isDeleted);
}
Result res = target.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
if (sleep) {
LOG.info("Waiting for more time for replication. Row:"
+ Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
Thread.sleep(SLEEP_TIME);
} else {
assertArrayEquals(res.value(), row);
if (!isDeleted) {
assertArrayEquals(res.value(), row);
}
LOG.info("Obtained row:"
+ Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
break;
}
}
}
}
/**
* Use a coprocessor to count puts and deletes.
* as KVs would be replicated back with the same timestamp
* there is otherwise no way to count them.
* Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
* timestamp there is otherwise no way to count them.
*/
public static class CoprocessorCounter extends BaseRegionObserver {
private int nCount = 0;
private int nDelete = 0;
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit,
final Durability durability)
throws IOException {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
nCount++;
}
@Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit,
final Durability durability)
throws IOException {
final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
nDelete++;
}
@Override
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get, final List<KeyValue> result) throws IOException {
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
final List<KeyValue> result) throws IOException {
if (get.getAttribute("count") != null) {
result.clear();
// order is important!
@ -344,5 +440,4 @@ public class TestMasterReplication {
}
}
}
}