HBASE-1728 Column family scoping and cluster identification

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@911168 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-02-17 20:32:43 +00:00
parent 0faf605ea1
commit 9afce6678e
13 changed files with 162 additions and 27 deletions

View File

@ -89,7 +89,8 @@ public class ReplicationRegion extends HRegion {
// complete log flush.
if(!(Bytes.equals(key.getTablename(),ROOT_TABLE_NAME) ||
Bytes.equals(key.getTablename(),META_TABLE_NAME)) &&
!Bytes.equals(val.getFamily(), HLog.METAFAMILY)) {
!Bytes.equals(val.getFamily(), HLog.METAFAMILY) &&
key.getScope() == REPLICATION_SCOPE_GLOBAL) {
this.replicationSource.enqueueLog(entry);
}

View File

@ -59,6 +59,7 @@ public class ReplicationSource extends Chore implements HConstants {
private final float ratio;
private final Random random;
private final AtomicBoolean isReplicating;
private final byte clusterId;
private List<HServerAddress> currentPeers;
@ -80,6 +81,7 @@ public class ReplicationSource extends Chore implements HConstants {
this.ratio = this.conf.getFloat("replication.ratio", 0.1f);
currentPeers = new ArrayList<HServerAddress>();
this.random = new Random();
this.clusterId = zkHelper.getClusterId();
this.isReplicating = isReplicating;
}
@ -120,6 +122,7 @@ public class ReplicationSource extends Chore implements HConstants {
*/
public void enqueueLog(HLog.Entry logEntry) {
if(this.isReplicating.get()) {
logEntry.getKey().setClusterId(this.clusterId);
this.queue.add(logEntry);
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.replication.ReplicationSource;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -68,8 +69,10 @@ public class ReplicationHLog extends HLog {
protected void doWrite(HRegionInfo info, HLogKey logKey,
KeyValue logEdit, long now)
throws IOException {
logKey.setScope(info.getTableDesc().getFamily(logEdit.getFamily()).getScope());
super.doWrite(info, logKey, logEdit, now);
if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion())) {
if(this.isReplicator && ! (info.isMetaRegion() || info.isRootRegion()) &&
logKey.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL) {
this.replicationSource.enqueueLog(new Entry(logKey, logEdit));
}

View File

@ -48,15 +48,16 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher {
private final String replicationZNode;
private final String peersZNode;
private final String replicationStateNodeName;
private final boolean isMaster;
private final boolean master;
private final Configuration conf;
private final AtomicBoolean isReplicating;
private final byte clusterId;
/**
* Constructor used by region servers
* @param zookeeperWrapper zkw to wrap
@ -77,13 +78,15 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher {
conf.get("zookeeper.znode.master", "master");
this.replicationStateNodeName =
conf.get("zookeeper.znode.state", "state");
String clusterIdName =
conf.get("zookeeper.znode.clusterId", "clusterId");
this.peerClusters = new ArrayList<ZooKeeperWrapper>();
this.replicationZNode = zookeeperWrapper.getZNode(
zookeeperWrapper.getParentZNode(),replicationZNodeName);
this.peersZNode =
zookeeperWrapper.getZNode(replicationZNode,peersZNodeName);
List<String> znodes =
this.zookeeperWrapper.listZnodes(this.peersZNode, this);
@ -94,15 +97,19 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher {
}
String address = this.zookeeperWrapper.getData(this.replicationZNode,
repMasterZNodeName);
String idResult = this.zookeeperWrapper.getData(this.replicationZNode,
clusterIdName);
this.clusterId =
idResult == null ? DEFAULT_CLUSTER_ID : Byte.valueOf(idResult);
String thisCluster = this.conf.get(ZOOKEEPER_QUORUM)+":"+
this.conf.get("hbase.zookeeper.property.clientPort") +":" +
this.conf.get(ZOOKEEPER_ZNODE_PARENT);
this.isMaster = thisCluster.equals(address);
this.master = thisCluster.equals(address);
LOG.info("This cluster (" + thisCluster + ") is a "
+ (this.isMaster ? "master" : "slave") + " for replication" +
+ (this.master ? "master" : "slave") + " for replication" +
", compared with (" + address + ")");
this.isReplicating = isReplicating;
@ -139,10 +146,18 @@ public class ReplicationZookeeperHelper implements HConstants, Watcher {
/**
* Tells if this cluster replicates or not
* @return
* @return if this is a master
*/
public boolean isMaster() {
return isMaster;
return master;
}
/**
* Get the identification of the cluster
* @return the id for the cluster
*/
public byte getClusterId() {
return this.clusterId;
}
@Override

View File

@ -36,8 +36,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
public class TestReplication implements HConstants{
protected static final Log LOG = LogFactory.getLog(TestReplication.class);
@ -53,7 +51,7 @@ public class TestReplication implements HConstants{
private final int NB_ROWS_IN_BATCH = 100;
private final long SLEEP_TIME = 500;
private final int NB_RETRIES = 10;
private final int NB_RETRIES = 5;
/**
@ -133,10 +131,14 @@ public class TestReplication implements HConstants{
byte[] tableName = Bytes.toBytes("test");
byte[] famName = Bytes.toBytes("f");
byte[] noRepfamName = Bytes.toBytes("norep");
byte[] row = Bytes.toBytes("row");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
HBaseAdmin admin1 = new HBaseAdmin(conf1);
@ -260,6 +262,24 @@ public class TestReplication implements HConstants{
}
}
put = new Put(Bytes.toBytes("do not rep"));
put.add(noRepfamName, row, row);
table1.put(put);
get = new Get(Bytes.toBytes("do not rep"));
for(int i = 0; i < NB_RETRIES; i++) {
if(i == NB_RETRIES-1) {
break;
}
Result res = table2.get(get);
if(res.size() >= 1) {
fail("Not supposed to be replicated");
} else {
LOG.info("Row not replicated, let's wait a bit more...");
Thread.sleep(SLEEP_TIME);
}
}
}
private void setIsReplication(String bool) throws Exception{

View File

@ -77,6 +77,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public static final String FOREVER = "FOREVER";
public static final String MAPFILE_INDEX_INTERVAL =
"MAPFILE_INDEX_INTERVAL";
public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
/**
* Default compression type.
@ -121,6 +122,11 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
*/
public static final int DEFAULT_TTL = HConstants.FOREVER;
/**
* Default scope.
*/
public static final int DEFAULT_REPLICATION_SCOPE = HConstants.REPLICATION_SCOPE_LOCAL;
// Column family name
private byte [] name;
@ -203,7 +209,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
final boolean blockCacheEnabled,
final int timeToLive, final boolean bloomFilter) {
this(familyName, maxVersions, compression, inMemory, blockCacheEnabled,
DEFAULT_BLOCKSIZE, timeToLive, bloomFilter);
DEFAULT_BLOCKSIZE, timeToLive, bloomFilter, DEFAULT_REPLICATION_SCOPE);
}
/**
@ -219,6 +225,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
* @param timeToLive Time-to-live of cell contents, in seconds
* (use HConstants.FOREVER for unlimited TTL)
* @param bloomFilter Enable the specified bloom filter for this column
* @param scope The scope tag for this column
*
* @throws IllegalArgumentException if passed a family name that is made of
* other than 'word' characters: i.e. <code>[a-zA-Z_0-9]</code> or contains
@ -228,7 +235,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
public HColumnDescriptor(final byte [] familyName, final int maxVersions,
final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final boolean bloomFilter) {
final int timeToLive, final boolean bloomFilter, final int scope) {
isLegalFamilyName(familyName);
this.name = familyName;
@ -245,6 +252,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
valueOf(compression.toUpperCase()));
setBloomfilter(bloomFilter);
setBlocksize(blocksize);
setScope(scope);
}
/**
@ -482,6 +490,24 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setValue(MAPFILE_INDEX_INTERVAL, Integer.toString(interval));
}
/**
* @return the scope tag
*/
public int getScope() {
String value = getValue(REPLICATION_SCOPE);
if (value != null) {
return Integer.valueOf(value).intValue();
}
return DEFAULT_REPLICATION_SCOPE;
}
/**
* @param scope the scope tag
*/
public void setScope(int scope) {
setValue(REPLICATION_SCOPE, Integer.toString(scope));
}
/**
* @see java.lang.Object#toString()
*/

View File

@ -289,6 +289,24 @@ public interface HConstants {
TABLE_SET_HTD,
TABLE_SPLIT
}
/**
* Scope tag for locally scoped data.
* This data will not be replicated.
*/
public static final int REPLICATION_SCOPE_LOCAL = 0;
/**
* Scope tag for globally scoped data.
* This data will be replicated to all peers.
*/
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
*/
public static final byte DEFAULT_CLUSTER_ID = 0;
/**
* Parameter name for maximum number of bytes returned when calling a

View File

@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@ -663,7 +662,7 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
new HColumnDescriptor[] { new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false) });
HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL) });
/** Table descriptor for <code>.META.</code> catalog table */
public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor(
@ -671,9 +670,9 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false),
HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL),
new HColumnDescriptor(HConstants.CATALOG_HISTORIAN_FAMILY,
HConstants.ALL_VERSIONS, Compression.Algorithm.NONE.getName(),
false, false, 8 * 1024,
HConstants.WEEK_IN_SECONDS, false)});
HConstants.WEEK_IN_SECONDS, false, HConstants.REPLICATION_SCOPE_LOCAL)});
}

View File

@ -43,8 +43,11 @@ public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
private long logSeqNum;
// Time at which this edit was written.
private long writeTime;
private byte clusterId;
private int scope;
private int HEAP_TAX = ClassSize.OBJECT + (2 * ClassSize.ARRAY) +
(2 * Bytes.SIZEOF_LONG);
(2 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT;
/** Writable Consructor -- Do not use. */
public HLogKey() {
@ -67,6 +70,8 @@ public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
this.tablename = tablename;
this.logSeqNum = logSeqNum;
this.writeTime = now;
this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
this.scope = HConstants.REPLICATION_SCOPE_LOCAL;
}
//////////////////////////////////////////////////////////////////////////////
@ -99,6 +104,38 @@ public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
return this.writeTime;
}
/**
* Get the id of the original cluster
* @return
*/
public byte getClusterId() {
return clusterId;
}
/**
* Set the cluster id of this key
* @param clusterId
*/
public void setClusterId(byte clusterId) {
this.clusterId = clusterId;
}
/**
* Get the replication scope of this key
* @return replication scope
*/
public int getScope() {
return this.scope;
}
/**
* Set the replication scope of this key
* @param scope The new scope
*/
public void setScope(int scope) {
this.scope = scope;
}
@Override
public String toString() {
return Bytes.toString(tablename) + "/" + Bytes.toString(regionName) + "/" +
@ -121,6 +158,8 @@ public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
int result = Bytes.hashCode(this.regionName);
result ^= this.logSeqNum;
result ^= this.writeTime;
result ^= this.clusterId;
result ^= this.scope;
return result;
}
@ -146,8 +185,10 @@ public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.regionName);
Bytes.writeByteArray(out, this.tablename);
out.writeLong(logSeqNum);
out.writeLong(this.logSeqNum);
out.writeLong(this.writeTime);
out.writeByte(this.clusterId);
out.writeInt(this.scope);
}
public void readFields(DataInput in) throws IOException {
@ -155,6 +196,12 @@ public class HLogKey implements WritableComparable<HLogKey>, HeapSize {
this.tablename = Bytes.readByteArray(in);
this.logSeqNum = in.readLong();
this.writeTime = in.readLong();
try {
this.clusterId = in.readByte();
this.scope = in.readInt();
} catch(EOFException e) {
// Means it's an old key, just continue
}
}
public long heapSize() {

View File

@ -194,13 +194,14 @@ public abstract class HBaseTestCase extends TestCase {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(fam1, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
Integer.MAX_VALUE, HConstants.FOREVER, false));
Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam2, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
Integer.MAX_VALUE, HConstants.FOREVER, false));
Integer.MAX_VALUE, HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam3, versions,
HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
Integer.MAX_VALUE, HConstants.FOREVER, false));
Integer.MAX_VALUE, HConstants.FOREVER,
false, HConstants.REPLICATION_SCOPE_LOCAL));
return htd;
}

View File

@ -286,7 +286,8 @@ public class HBaseTestingUtility {
HColumnDescriptor.DEFAULT_COMPRESSION,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, false);
Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
desc.addFamily(hcd);
}
(new HBaseAdmin(getConfiguration())).createTable(desc);
@ -311,7 +312,8 @@ public class HBaseTestingUtility {
HColumnDescriptor.DEFAULT_COMPRESSION,
HColumnDescriptor.DEFAULT_IN_MEMORY,
HColumnDescriptor.DEFAULT_BLOCKCACHE,
Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL, false);
Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
desc.addFamily(hcd);
i++;
}

View File

@ -67,7 +67,7 @@ public class TestScanner extends HBaseTestCase {
TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false));
HConstants.FOREVER, false, HConstants.REPLICATION_SCOPE_LOCAL));
}
/** HRegionInfo for root region */
public static final HRegionInfo REGION_INFO =

View File

@ -32,7 +32,7 @@ public class TestWideScanner extends HBaseTestCase {
TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, false));
HConstants.FOREVER, false, HColumnDescriptor.DEFAULT_REPLICATION_SCOPE));
}
/** HRegionInfo for root region */
public static final HRegionInfo REGION_INFO =