HBASE-17010 Serial replication should handle daughter regions being assigned to another RS (Phil Yang)
This commit is contained in:
parent
874cf5128f
commit
97276da9a7
|
@ -1796,6 +1796,18 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
// Enable cache of data blocks in L1 if more than one caching tier deployed:
|
||||
// e.g. if using CombinedBlockCache (BucketCache).
|
||||
.setCacheDataInL1(true),
|
||||
new HColumnDescriptor(HConstants.REPLICATION_META_FAMILY)
|
||||
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
|
||||
HConstants.DEFAULT_HBASE_META_VERSIONS))
|
||||
.setInMemory(true)
|
||||
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
|
||||
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
|
||||
.setBloomFilterType(BloomType.NONE)
|
||||
// Enable cache of data blocks in L1 if more than one caching tier deployed:
|
||||
// e.g. if using CombinedBlockCache (BucketCache).
|
||||
.setCacheDataInL1(true),
|
||||
});
|
||||
metaDescriptor.addCoprocessor(
|
||||
"org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
|
||||
|
|
|
@ -110,14 +110,14 @@ public class MetaTableAccessor {
|
|||
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
|
||||
* and should not leak out of it (through Result objects, etc)
|
||||
*
|
||||
* For replication serially, there are two column families "rep_barrier", "rep_position" whose
|
||||
* row key is encodedRegionName.
|
||||
* For replication serially, there are three column families "rep_barrier", "rep_position" and
|
||||
* "rep_meta" whose row key is encodedRegionName.
|
||||
* rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
|
||||
* id in this region
|
||||
* rep_position:{peerid} => to save the max sequence id we have pushed for each peer
|
||||
* rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
|
||||
* rep_meta:_TABLENAME_ => a special cell to save this region's table name, will used when
|
||||
* we clean old data
|
||||
* rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this
|
||||
* rep_meta:_DAUGHTER_ => a special cell to present this region is split or merged, in this
|
||||
* cell the value is merged encoded name or two split encoded names
|
||||
* separated by ","
|
||||
*/
|
||||
|
@ -125,10 +125,10 @@ public class MetaTableAccessor {
|
|||
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
|
||||
|
||||
// Save its daughter region(s) when split/merge
|
||||
private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
|
||||
private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_");
|
||||
|
||||
// Save its table name because we only know region's encoded name
|
||||
private static final String tableNamePeer = "_TABLENAME_";
|
||||
private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
|
||||
private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_");
|
||||
|
||||
static final byte [] META_REGION_PREFIX;
|
||||
static {
|
||||
|
@ -985,13 +985,13 @@ public class MetaTableAccessor {
|
|||
byte[] seqBytes = Bytes.toBytes(seq);
|
||||
return new Put(encodedRegionName)
|
||||
.addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
|
||||
.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
|
||||
.addImmutable(HConstants.REPLICATION_META_FAMILY, tableNameCq, tableName);
|
||||
}
|
||||
|
||||
|
||||
public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
|
||||
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
|
||||
daughterNamePosCq, value);
|
||||
public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) {
|
||||
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_META_FAMILY,
|
||||
daughterNameCq, value);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1016,7 +1016,7 @@ public class MetaTableAccessor {
|
|||
* @param puts Put to add to hbase:meta
|
||||
* @throws IOException
|
||||
*/
|
||||
static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
|
||||
public static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
|
||||
put(getMetaHTable(connection), Arrays.asList(puts));
|
||||
}
|
||||
|
||||
|
@ -1297,10 +1297,10 @@ public class MetaTableAccessor {
|
|||
+ HConstants.DELIMITER);
|
||||
Mutation[] mutations;
|
||||
if (saveBarrier) {
|
||||
Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(mergedRegion.getEncodedName()));
|
||||
Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(mergedRegion.getEncodedName()));
|
||||
Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(),
|
||||
mergedRegion.getEncodedNameAsBytes());
|
||||
Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(),
|
||||
mergedRegion.getEncodedNameAsBytes());
|
||||
mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
|
||||
} else {
|
||||
mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
|
||||
|
@ -1352,10 +1352,9 @@ public class MetaTableAccessor {
|
|||
|
||||
Mutation[] mutations;
|
||||
if (saveBarrier) {
|
||||
Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
|
||||
Bytes
|
||||
.toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
|
||||
mutations = new Mutation[]{putParent, putA, putB, putBarrier};
|
||||
Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName()));
|
||||
mutations = new Mutation[]{putParent, putA, putB, parentPut };
|
||||
} else {
|
||||
mutations = new Mutation[]{putParent, putA, putB};
|
||||
}
|
||||
|
@ -1638,14 +1637,9 @@ public class MetaTableAccessor {
|
|||
Result r = get(getMetaHTable(connection), get);
|
||||
Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
|
||||
for (Cell c : r.listCells()) {
|
||||
if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
|
||||
c.getQualifierOffset(), c.getQualifierLength()) &&
|
||||
!Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
|
||||
c.getQualifierOffset(), c.getQualifierLength())) {
|
||||
map.put(
|
||||
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
|
||||
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
|
||||
}
|
||||
map.put(
|
||||
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
|
||||
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
@ -1692,12 +1686,14 @@ public class MetaTableAccessor {
|
|||
|
||||
/**
|
||||
* Get daughter region(s) for a region, only used in serial replication.
|
||||
* @param connection connection we're using
|
||||
* @param encodedName region's encoded name
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
|
||||
throws IOException {
|
||||
Get get = new Get(encodedName);
|
||||
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
|
||||
get.addColumn(HConstants.REPLICATION_META_FAMILY, daughterNameCq);
|
||||
Result result = get(getMetaHTable(connection), get);
|
||||
if (!result.isEmpty()) {
|
||||
Cell c = result.rawCells()[0];
|
||||
|
@ -1708,12 +1704,14 @@ public class MetaTableAccessor {
|
|||
|
||||
/**
|
||||
* Get the table name for a region, only used in serial replication.
|
||||
* @param connection connection we're using
|
||||
* @param encodedName region's encoded name
|
||||
* @throws IOException
|
||||
*/
|
||||
public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
|
||||
throws IOException {
|
||||
Get get = new Get(encodedName);
|
||||
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
|
||||
get.addColumn(HConstants.REPLICATION_META_FAMILY, tableNameCq);
|
||||
Result result = get(getMetaHTable(connection), get);
|
||||
if (!result.isEmpty()) {
|
||||
Cell c = result.rawCells()[0];
|
||||
|
|
|
@ -433,13 +433,20 @@ public final class HConstants {
|
|||
public static final byte [] REPLICATION_BARRIER_FAMILY =
|
||||
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
|
||||
|
||||
/** The replication barrier family as a string*/
|
||||
/** The replication position family as a string*/
|
||||
public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
|
||||
|
||||
/** The replication barrier family */
|
||||
/** The replication position family */
|
||||
public static final byte [] REPLICATION_POSITION_FAMILY =
|
||||
Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
|
||||
|
||||
/** The replication meta family as a string*/
|
||||
public static final String REPLICATION_META_FAMILY_STR = "rep_meta";
|
||||
|
||||
/** The replication meta family */
|
||||
public static final byte [] REPLICATION_META_FAMILY =
|
||||
Bytes.toBytes(REPLICATION_META_FAMILY_STR);
|
||||
|
||||
/** The RegionInfo qualifier as a string */
|
||||
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
|
||||
|
||||
|
|
|
@ -141,6 +141,7 @@ public class ReplicationMetaCleaner extends ScheduledChore {
|
|||
Delete delete = new Delete(encodedBytes);
|
||||
delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
|
||||
delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||
delete.addFamily(HConstants.REPLICATION_META_FAMILY);
|
||||
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
||||
metaTable.delete(delete);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue