HBASE-9465 Push entries to peer clusters serially

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Phil Yang 2016-08-04 10:11:56 +08:00 committed by zhangduo
parent 1ecb0fce34
commit 5cadcd59aa
20 changed files with 1176 additions and 58 deletions

View File

@ -1114,6 +1114,18 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> {
return Collections.unmodifiableCollection(this.families.values());
* Return true if there are at least one cf whose replication scope is serial.
public boolean hasSerialReplicationScope() {
for (HColumnDescriptor column: getFamilies()){
if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
return true;
return false;
* Returns the configured replicas per region

View File

@ -17,11 +17,15 @@
package org.apache.hadoop.hbase;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -34,8 +38,6 @@ import java.util.regex.Pattern;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -113,14 +115,31 @@ public class MetaTableAccessor {
* region is the result of a merge
* info:mergeB => contains a serialized HRI for the second parent region if the
* region is the result of a merge
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
* and should not leak out of it (through Result objects, etc)
* For replication serially, there are two column families "rep_barrier", "rep_position" whose
* row key is encodedRegionName.
* rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
* id in this region
* rep_position:{peerid} => to save the max sequence id we have pushed for each peer
* rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
* we clean old data
* rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this
* cell the value is merged encoded name or two split encoded names
* separated by ","
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META");
// Save its daughter region(s) when split/merge
private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
// Save its table name because we only know region's encoded name
private static final String tableNamePeer = "_TABLENAME_";
private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
static final byte [] META_REGION_PREFIX;
static {
@ -1318,6 +1337,19 @@ public class MetaTableAccessor {
return delete;
public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
byte[] seqBytes = Bytes.toBytes(seq);
return new Put(encodedRegionName)
.addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
daughterNamePosCq, value);
* Adds split daughters to the Put
@ -1334,27 +1366,28 @@ public class MetaTableAccessor {
* Put the passed <code>p</code> to the <code>hbase:meta</code> table.
* Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
* Non-atomic for multi puts.
* @param connection connection we're using
* @param p Put to add to hbase:meta
* @param puts Put to add to hbase:meta
* @throws IOException
static void putToMetaTable(final Connection connection, final Put p)
static void putToMetaTable(final Connection connection, final Put... puts)
throws IOException {
put(getMetaHTable(connection), p);
put(getMetaHTable(connection), Arrays.asList(puts));
* @param t Table to use (will be closed when done).
* @param p put to make
* @param puts puts to make
* @throws IOException
private static void put(final Table t, final Put p) throws IOException {
private static void put(final Table t, final List<Put> puts) throws IOException {
try {
if (METALOG.isDebugEnabled()) {
} finally {
@ -1490,7 +1523,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
* if you want to do that.
* @param meta the Table for META
* @param regionInfo region information
@ -1515,7 +1548,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo region information
@ -1601,11 +1634,12 @@ public class MetaTableAccessor {
* @param regionB
* @param sn the location of the region
* @param masterSystemTime
* @param saveBarrier true if need save replication barrier in meta, used for serial replication
* @throws IOException
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
long masterSystemTime)
long masterSystemTime, boolean saveBarrier)
throws IOException {
Table meta = getMetaHTable(connection);
try {
@ -1636,7 +1670,17 @@ public class MetaTableAccessor {
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
Mutation[] mutations;
if (saveBarrier) {
Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
} else {
mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
multiMutate(meta, tableRow, mutations);
} finally {
@ -1652,10 +1696,11 @@ public class MetaTableAccessor {
* @param splitA Split daughter region A
* @param splitB Split daughter region A
* @param sn the location of the region
* @param saveBarrier true if need save replication barrier in meta, used for serial replication
public static void splitRegion(final Connection connection,
HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
ServerName sn, int regionReplication) throws IOException {
public static void splitRegion(final Connection connection, HRegionInfo parent,
HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
boolean saveBarrier) throws IOException {
Table meta = getMetaHTable(connection);
try {
HRegionInfo copyOfParent = new HRegionInfo(parent);
@ -1680,8 +1725,17 @@ public class MetaTableAccessor {
addEmptyLocation(putB, i);
Mutation[] mutations;
if (saveBarrier) {
Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
.toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
mutations = new Mutation[]{putParent, putA, putB, putBarrier};
} else {
mutations = new Mutation[]{putParent, putA, putB};
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(meta, tableRow, putParent, putA, putB);
multiMutate(meta, tableRow, mutations);
} finally {
@ -1780,6 +1834,27 @@ public class MetaTableAccessor {
updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
* Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
* @param connection connection we're using
* @param peerId the peerId to push
* @param positions map that saving positions for each region
* @throws IOException
public static void updateReplicationPositions(Connection connection, String peerId,
Map<String, Long> positions) throws IOException {
List<Put> puts = new ArrayList<>();
for (Map.Entry<String, Long> entry : positions.entrySet()) {
long value = Math.abs(entry.getValue());
Put put = new Put(Bytes.toBytes(entry.getKey()));
put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
* Updates the location of the specified region to be the specified server.
* <p>
@ -1977,4 +2052,125 @@ public class MetaTableAccessor {
private static String mutationToString(Mutation p) throws IOException {
return p.getClass().getSimpleName() + p.toJSON();
* Get replication position for a peer in a region.
* @param connection connection we're using
* @return the position of this peer, -1 if no position in meta.
public static long getReplicationPositionForOnePeer(Connection connection,
byte[] encodedRegionName, String peerId) throws IOException {
Get get = new Get(encodedRegionName);
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
Result r = get(getMetaHTable(connection), get);
if (r.isEmpty()) {
return -1;
Cell cell = r.rawCells()[0];
return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
* Get replication positions for all peers in a region.
* @param connection connection we're using
* @param encodedRegionName region's encoded name
* @return the map of positions for each peer
public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
byte[] encodedRegionName) throws IOException {
Get get = new Get(encodedRegionName);
Result r = get(getMetaHTable(connection), get);
Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
for (Cell c : r.listCells()) {
if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
c.getQualifierOffset(), c.getQualifierLength()) &&
!Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
c.getQualifierOffset(), c.getQualifierLength())) {
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
return map;
* Get replication barriers for all peers in a region.
* @param encodedRegionName region's encoded name
* @return a list of barrier sequence numbers.
* @throws IOException
public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
throws IOException {
Get get = new Get(encodedRegionName);
Result r = get(getMetaHTable(connection), get);
List<Long> list = new ArrayList<>();
if (!r.isEmpty()) {
for (Cell cell : r.rawCells()) {
list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
return list;
* Get all barriers in all regions.
* @return a map of barrier lists in all regions
* @throws IOException
public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
Map<String, List<Long>> map = new HashMap<>();
Scan scan = new Scan();
try (Table t = getMetaHTable(connection);
ResultScanner scanner = t.getScanner(scan)) {
Result result;
while ((result = scanner.next()) != null) {
String key = Bytes.toString(result.getRow());
List<Long> list = new ArrayList<>();
for (Cell cell : result.rawCells()) {
list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
map.put(key, list);
return map;
* Get daughter region(s) for a region, only used in serial replication.
* @throws IOException
public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
throws IOException {
Get get = new Get(encodedName);
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
Result result = get(getMetaHTable(connection), get);
if (!result.isEmpty()) {
Cell c = result.rawCells()[0];
return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
return null;
* Get the table name for a region, only used in serial replication.
* @throws IOException
public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
throws IOException {
Get get = new Get(encodedName);
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
Result result = get(getMetaHTable(connection), get);
if (!result.isEmpty()) {
Cell c = result.rawCells()[0];
return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
return null;

View File

@ -721,6 +721,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
Scan s = new Scan();
if (this.useMetaReplicas) {

View File

@ -92,8 +92,10 @@ public class ReplicationAdmin implements Closeable {
// only Global for now, can add other type
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
public static final String REPLICATIONTYPE = "replicationType";
public static final String REPLICATIONGLOBAL = Integer
public static final String REPLICATIONGLOBAL =
public static final String REPLICATIONSERIAL =
private final Connection connection;
// TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@ -430,7 +432,10 @@ public class ReplicationAdmin implements Closeable {
HashMap<String, String> replicationEntry = new HashMap<String, String>();
replicationEntry.put(TNAME, tableName);
replicationEntry.put(CFNAME, column.getNameAsString());
column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
@ -616,7 +621,8 @@ public class ReplicationAdmin implements Closeable {
private boolean isTableRepEnabled(HTableDescriptor htd) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
&& hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
return false;

View File

@ -429,6 +429,20 @@ public final class HConstants {
/** The catalog family */
public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
/** The replication barrier family as a string*/
public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
/** The replication barrier family */
public static final byte [] REPLICATION_BARRIER_FAMILY =
/** The replication barrier family as a string*/
public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
/** The replication barrier family */
public static final byte [] REPLICATION_POSITION_FAMILY =
/** The RegionInfo qualifier as a string */
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
@ -635,6 +649,12 @@ public final class HConstants {
public static final int REPLICATION_SCOPE_GLOBAL = 1;
* Scope tag for serially scoped data
* This data will be replicated to all peers by the order of sequence id.
public static final int REPLICATION_SCOPE_SERIAL = 2;
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
@ -866,6 +886,12 @@ public final class HConstants {
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
public static final String
REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
public static final long
* Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system

View File

@ -1571,6 +1571,20 @@ possible configurations would overwhelm and obscure the important.
slave clusters. The default of 10 will rarely need to be changed.
By default, in replication we can not make sure the order of operations in slave cluster is
same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
of written. This configure is to set how long (in ms) we will wait before next checking if a
log can not push right now because there are some logs written before it have not been pushed.
A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
of replication. This feature relies on zk-less assignment, and conflicts with distributed log
replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
false to support it.
<!-- Static Web User Filter properties. -->

View File

@ -21,6 +21,10 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
* <code>REPLICATION_SCOPE_SERIAL = 2;</code>
@ -31,6 +35,10 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
* <code>REPLICATION_SCOPE_SERIAL = 2;</code>
public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
public final int getNumber() { return value; }
@ -39,6 +47,7 @@ public final class WALProtos {
switch (value) {
default: return null;
@ -12013,11 +12022,12 @@ public final class WALProtos {
"\030\005 \003(\0132\031.hbase.pb.StoreDescriptor\022$\n\006ser" +
"ver\030\006 \001(\0132\024.hbase.pb.ServerName\022\023\n\013regio" +
"n_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REGION_OPEN" +
"\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*F\n\tSc" +
"\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrailer*d\n\tSc" +
"opeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030" +
"REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apache" +
".hadoop.hbase.protobuf.generatedB\tWALPro" +
"N_SCOPE_SERIAL\020\002B?\n*org.apache.hadoop.hb" +
"ase.protobuf.generatedB\tWALProtosH\001\210\001\000\240\001" +
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -75,6 +75,7 @@ message WALKey {
enum ScopeType {
message FamilyScope {

View File

@ -98,6 +98,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
@ -311,6 +312,7 @@ public class HMaster extends HRegionServer implements MasterServices {
CatalogJanitor catalogJanitorChore;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
private ReplicationMetaCleaner replicationMetaCleaner;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
@ -988,6 +990,8 @@ public class HMaster extends HRegionServer implements MasterServices {
LOG.error("start replicationZKLockCleanerChore failed", e);
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
@ -1022,6 +1026,7 @@ public class HMaster extends HRegionServer implements MasterServices {
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();

View File

@ -17,22 +17,25 @@
package org.apache.hadoop.hbase.master;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.master.RegionState.State;
@ -44,8 +47,6 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Preconditions;
* A helper to persist region state in meta. We may change this class
* to StateStore later if we also use it to store other states in meta
@ -60,7 +61,7 @@ public class RegionStateStore {
private volatile Region metaRegion;
private volatile boolean initialized;
private MultiHConnection multiHConnection;
private final Server server;
private final MasterServices server;
* Returns the {@link ServerName} from catalog table {@link Result}
@ -130,7 +131,7 @@ public class RegionStateStore {
RegionStateStore(final Server server) {
RegionStateStore(final MasterServices server) {
this.server = server;
initialized = false;
@ -187,31 +188,41 @@ public class RegionStateStore {
State state = newState.getState();
int replicaId = hri.getReplicaId();
Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
StringBuilder info = new StringBuilder("Updating hbase:meta row ");
info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
if (serverName != null && !serverName.equals(oldServer)) {
put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
info.append(", sn=").append(serverName);
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server");
MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
info.append(", openSeqNum=").append(openSeqNum);
info.append(", server=").append(serverName);
put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
boolean serial = false;
if (descriptor != null) {
serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
boolean shouldPutBarrier = serial && state == State.OPEN;
// Persist the state change to meta
if (metaRegion != null) {
try {
// Assume meta is pinned to master.
// At least, that's what we want.
if (shouldPutBarrier) {
Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
openSeqNum, hri.getTable().getName());
return; // Done here
} catch (Throwable t) {
// In unit tests, meta could be moved away by intention
@ -230,8 +241,10 @@ public class RegionStateStore {
// Called when meta is not on master
TableName.META_TABLE_NAME, null, null);
List<Put> list = shouldPutBarrier ?
Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
} catch (IOException ioe) {
LOG.error("Failed to persist region state " + newState, ioe);
@ -241,12 +254,14 @@ public class RegionStateStore {
void splitRegion(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
void mergeRegions(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,

View File

@ -0,0 +1,186 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
* This chore is to clean up the useless data in hbase:meta which is used by serial replication.
public class ReplicationMetaCleaner extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
private ReplicationAdmin replicationAdmin;
private MasterServices master;
public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
throws IOException {
super("ReplicationMetaCleaner", stoppable, period);
this.master = master;
replicationAdmin = new ReplicationAdmin(master.getConfiguration());
protected void chore() {
try {
Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAllDescriptors();
Map<String, Set<String>> serialTables = new HashMap<>();
for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
boolean hasSerialScope = false;
for (HColumnDescriptor column : entry.getValue().getFamilies()) {
if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
hasSerialScope = true;
if (hasSerialScope) {
serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
if (serialTables.isEmpty()){
Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
for (Map.Entry<TableName, List<String>> map : entry.getValue().getTableCFsMap()
.entrySet()) {
if (serialTables.containsKey(map.getKey().getNameAsString())) {
Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
String encodedName = entry.getKey();
byte[] encodedBytes = Bytes.toBytes(encodedName);
boolean canClearRegion = false;
Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
master.getConnection(), encodedBytes);
if (posMap.isEmpty()) {
String tableName = MetaTableAccessor.getSerialReplicationTableName(
master.getConnection(), encodedBytes);
Set<String> confPeers = serialTables.get(tableName);
if (confPeers == null) {
// This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
canClearRegion = true;
} else {
if (!allPeersHavePosition(confPeers, posMap)) {
String daughterValue = MetaTableAccessor
.getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
if (daughterValue != null) {
//this region is merged or split
boolean allDaughterStart = true;
String[] daughterRegions = daughterValue.split(",");
for (String daughter : daughterRegions) {
byte[] region = Bytes.toBytes(daughter);
if (!MetaTableAccessor.getReplicationBarriers(
master.getConnection(), region).isEmpty() &&
.getReplicationPositionForAllPeer(master.getConnection(), region))) {
allDaughterStart = false;
if (allDaughterStart) {
canClearRegion = true;
if (canClearRegion) {
Delete delete = new Delete(encodedBytes);
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
} else {
// Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
// is smaller than min pos should be kept. All other barriers can be deleted.
long minPos = Long.MAX_VALUE;
for (Map.Entry<String, Long> pos : posMap.entrySet()) {
minPos = Math.min(minPos, pos.getValue());
List<Long> barriers = entry.getValue();
int index = Collections.binarySearch(barriers, minPos);
if (index < 0) {
index = -index - 1;
Delete delete = new Delete(encodedBytes);
for (int i = 0; i < index - 1; i++) {
delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
} catch (IOException e) {
LOG.error("Exception during cleaning up.", e);
private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
throws IOException {
for(String peer:peers){
if (!posMap.containsKey(peer)){
return false;
return true;

View File

@ -125,7 +125,6 @@ class FSWALEntry extends Entry {
CellUtil.setSequenceId(c, regionSequenceId);
return regionSequenceId;

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@ -279,6 +280,17 @@ public class Replication extends WALActionsListener.Base implements
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
if (!foundOtherEdits && logEdit.getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
if (maybeEvent != null && (maybeEvent.getEventType() ==
WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
@ -29,8 +33,10 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
@ -48,9 +54,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -102,6 +110,8 @@ public class ReplicationSource extends Thread
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
String actualPeerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@ -185,6 +195,8 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
@ -507,6 +519,17 @@ public class ReplicationSource extends Thread
// Current number of hfiles that we need to replicate
private long currentNbHFiles = 0;
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
new CacheLoader<String, Boolean>() {
public Boolean load(String key) throws Exception {
return false;
public ReplicationSourceWorkerThread(String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSource source) {
@ -588,9 +611,24 @@ public class ReplicationSource extends Thread
currentNbOperations = 0;
currentNbHFiles = 0;
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
lastPositionsForSerialScope)) {
for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
try {
.updateReplicationPositions(manager.getConnection(), actualPeerId,
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
} catch (IOException ioe) {
@ -626,15 +664,30 @@ public class ReplicationSource extends Thread
LOG.warn("Unable to finalize the tailing of a file", e);
for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
peerClusterZnode, this.repLogReader.getPosition(),
// Save positions to meta table before zk.
if (!gotIOE) {
try {
MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
// Reset the sleep multiplier if nothing has actually gone wrong
@ -649,8 +702,7 @@ public class ReplicationSource extends Thread
sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries);
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
@ -672,16 +724,42 @@ public class ReplicationSource extends Thread
private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
String key = entry.getKey();
long seq = entry.getValue();
boolean deleteKey = false;
if (seq <= 0) {
// There is a REGION_CLOSE marker, we can not continue skipping after this entry.
deleteKey = true;
seq = -seq;
if (!canSkipWaitingSet.getUnchecked(key)) {
try {
manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
} catch (Exception e) {
LOG.error("waitUntilCanBePushed fail", e);
stopper.stop("waitUntilCanBePushed fail");
canSkipWaitingSet.put(key, true);
if (deleteKey) {
* Read all the entries from the current log files and retain those that need to be replicated.
* Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to
* @param entries resulting entries to be replicated
* @param lastPosition save the last sequenceid for each region if the table has
* serial-replication scope
* @return true if we got nothing and went to the next file, false if we got entries
* @throws IOException
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
List<WAL.Entry> entries) throws IOException {
List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
@ -694,6 +772,27 @@ public class ReplicationSource extends Thread
if (entry.hasSerialReplicationScope()) {
String key = Bytes.toString(entry.getKey().getEncodedRegionName());
lastPosition.put(key, entry.getKey().getSequenceId());
if (entry.getEdit().getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
if (maybeEvent != null && maybeEvent.getEventType()
== WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
// In serially replication, if we move a region to another RS and move it back, we may
// read logs crossing two sections. We should break at REGION_CLOSE and push the first
// section first in case of missing the middle section belonging to the other RS.
// In a worker thread, if we can push the first log of a region, we can push all logs
// in the same region without waiting until we read a close marker because next time
// we read logs in this region, it must be a new section and not adjacent with this
// region. Mark it negative.
lastPosition.put(key, -entry.getKey().getSequenceId());
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
@ -723,6 +822,7 @@ public class ReplicationSource extends Thread
|| entries.size() >= replicationQueueNbCapacity) {
try {
entry = this.repLogReader.readNextAndSetPosition();
} catch (IOException ie) {
@ -995,7 +1095,8 @@ public class ReplicationSource extends Thread
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
Map<String, Long> lastPositionsForSerialScope) {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
LOG.warn("Was given 0 edits to ship");
@ -1046,6 +1147,16 @@ public class ReplicationSource extends Thread
for (int i = 0; i < size; i++) {
// Save positions to meta table before zk.
try {
MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
//Log and clean up WAL logs
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -48,10 +49,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@ -64,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -118,6 +123,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
private Connection connection;
private long replicationWaitTime;
* Creates a replication manager and sets the watch on all the other registered region servers
@ -134,7 +141,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
final Path oldLogDir, final UUID clusterId) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@ -171,6 +178,9 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationForBulkLoadDataEnabled =
this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
connection = ConnectionFactory.createConnection(conf);
@ -782,6 +792,10 @@ public class ReplicationSourceManager implements ReplicationListener {
return this.fs;
public Connection getConnection() {
return this.connection;
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
@ -814,4 +828,75 @@ public class ReplicationSourceManager implements ReplicationListener {
public void cleanUpHFileRefs(String peerId, List<String> files) {
this.replicationQueues.removeHFileRefs(peerId, files);
* Whether an entry can be pushed to the peer or not right now.
* If we enable serial replication, we can not push the entry until all entries in its region
* whose sequence numbers are smaller than this entry have been pushed.
* For each ReplicationSource, we need only check the first entry in each region, as long as it
* can be pushed, we can push all in this ReplicationSource.
* This method will be blocked until we can push.
* @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
* prevent saving positions in the region of no barrier.
void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
throws IOException, InterruptedException {
* There are barriers for this region and position for this peer. N barriers form N intervals,
* (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
* the first barrier and the last interval is start from the last barrier.
* There are several conditions that we can push now, otherwise we should block:
* 1) "Serial replication" is not enabled, we can push all logs just like before. This case
* should not call this method.
* 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
* It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
* order of logs that is written before altering.
* 3) This entry is in the first interval of barriers. We can push them because it is the
* start of a region. Splitting/merging regions are also ok because the first section of
* daughter region is in same region of parents and the order in one RS is guaranteed.
* 4) If the entry's seq id and the position are in same section, or the pos is the last
* number of previous section. Because when open a region we put a barrier the number
* is the last log's id + 1.
* 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
* after save replication meta and before save zk offset.
List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
if (barriers.isEmpty() || seq <= barriers.get(0)) {
// Case 2
int interval = Collections.binarySearch(barriers, seq);
if (interval < 0) {
interval = -interval - 1;// get the insert position if negative
if (interval == 1) {
// Case 3
while (true) {
long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
if (seq <= pos) {
// Case 5
if (pos >= 0) {
// Case 4
int posInterval = Collections.binarySearch(barriers, pos);
if (posInterval < 0) {
posInterval = -posInterval - 1;// get the insert position if negative
if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
+ " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
+ " barriers=" + Arrays.toString(barriers.toArray()));

View File

@ -144,6 +144,30 @@ public class FSTableDescriptors implements TableDescriptors {
// Enable cache of data blocks in L1 if more than one caching tier deployed:
// e.g. if using CombinedBlockCache (BucketCache).
new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
// Enable cache of data blocks in L1 if more than one caching tier deployed:
// e.g. if using CombinedBlockCache (BucketCache).
new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
// Enable cache of data blocks in L1 if more than one caching tier deployed:
// e.g. if using CombinedBlockCache (BucketCache).
new HColumnDescriptor(HConstants.TABLE_FAMILY)
// Ten is arbitrary number. Keep versions to help debugging.

View File

@ -22,8 +22,11 @@ package org.apache.hadoop.hbase.wal;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
@ -282,6 +286,18 @@ public interface WAL {
public boolean hasSerialReplicationScope () {
if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) {
return false;
for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) {
if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
return true;
return false;
public String toString() {
return this.key + "=" + this.edit;

View File

@ -452,7 +452,7 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@ -481,7 +481,7 @@ public class TestMetaTableAccessor {
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
HConstants.LATEST_TIMESTAMP, false);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@ -609,7 +609,7 @@ public class TestMetaTableAccessor {
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
regionInfoA, regionInfoB, sn, 1, masterSystemTime);
regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@ -692,7 +692,7 @@ public class TestMetaTableAccessor {
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
assertTrue(prevCalls < scheduler.numPriorityCalls);

View File

@ -1211,7 +1211,7 @@ public class TestAssignmentManagerOnCluster {
public void testUpdatesRemoteMeta() throws Exception {
conf.setInt("hbase.regionstatestore.meta.connection", 3);
final RegionStateStore rss =
new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
// Create 10 threads and make each do 10 puts related to region state update
Thread[] th = new Thread[10];

View File

@ -0,0 +1,399 @@
* Copyright The Apache Software Foundation
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSerialReplication {
private static final Log LOG = LogFactory.getLog(TestSerialReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] VALUE = Bytes.toBytes("v");
private static final byte[] ROW = Bytes.toBytes("r");
private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
conf1.setInt("replication.source.size.capacity", 1024);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes
conf1.setLong("replication.source.size.capacity", 1L);
conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L);
utility1 = new HBaseTestingUtility(conf1);
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
new ZooKeeperWatcher(conf2, "cluster2", null, true);
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
admin1.addPeer("1", rpc, null);
utility1.startMiniCluster(1, 3);
utility2.startMiniCluster(1, 1);
utility1.getHBaseAdmin().setBalancerRunning(false, true);
public void testRegionMoveAndFailover() throws Exception {
TableName tableName = TableName.valueOf("testRSFailover");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
LOG.info("move to 1");
moveRegion(t1, 1);
LOG.info("move to 0");
moveRegion(t1, 0);
for (int i = 10; i < 20; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
LOG.info("move to 2");
moveRegion(t1, 2);
for (int i = 20; i < 30; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
for (int i = 30; i < 40; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
Scan scan = new Scan();
List<Cell> list = new ArrayList<>();
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
assertEquals(1, result.rawCells().length);
List<Integer> listOfNumbers = getRowNumbers(list);
assertIntegerList(listOfNumbers, 10, 1);
if (listOfNumbers.size() != 30) {
LOG.info("Waiting all logs pushed to slave. Expected 30 , actual " + list.size());
throw new Exception("Not all logs have been pushed");
} finally {
public void testRegionSplit() throws Exception {
TableName tableName = TableName.valueOf("testRegionSplit");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
for (int i = 10; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
for (int i = 11; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
for (int i = 12; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
Scan scan = new Scan();
List<Cell> list = new ArrayList<>();
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
assertEquals(1, result.rawCells().length);
List<Integer> listOfNumbers = getRowNumbers(list);
List<Integer> list0 = new ArrayList<>();
List<Integer> list1 = new ArrayList<>();
List<Integer> list21 = new ArrayList<>();
List<Integer> list22 = new ArrayList<>();
for (int num : listOfNumbers) {
if (num % 10 == 0) {
} else if (num % 10 == 1) {
} else if (num < 50) { //num%10==2
} else { // num%10==1&&num>50
assertIntegerList(list0, 10, 10);
assertIntegerList(list1, 11, 10);
assertIntegerList(list21, 12, 10);
assertIntegerList(list22, 52, 10);
if (!list1.isEmpty()) {
assertEquals(9, list0.size());
if (!list21.isEmpty() || !list22.isEmpty()) {
assertEquals(9, list1.size());
if (list.size() == 27) {
LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
throw new Exception("Not all logs have been pushed");
} finally {
public void testRegionMerge() throws Exception {
TableName tableName = TableName.valueOf("testRegionMerge");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
for (int i = 10; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
List<Pair<HRegionInfo, ServerName>> regions =
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
assertEquals(2, regions.size());
regions.get(1).getFirst().getRegionName(), true);
for (int i = 11; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
Scan scan = new Scan();
List<Cell> list = new ArrayList<>();
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
assertEquals(1, result.rawCells().length);
List<Integer> listOfNumbers = getRowNumbers(list);
List<Integer> list0 = new ArrayList<>();
List<Integer> list1 = new ArrayList<>();
for (int num : listOfNumbers) {
if (num % 10 == 0) {
} else {
assertIntegerList(list0, 10, 10);
assertIntegerList(list1, 11, 10);
if (!list1.isEmpty()) {
assertEquals(9, list0.size());
if (list.size() == 18) {
LOG.info("Waiting all logs pushed to slave. Expected 18 , actual " + list.size());
} finally {
private List<Integer> getRowNumbers(List<Cell> cells) {
List<Integer> listOfRowNumbers = new ArrayList<>();
for (Cell c : cells) {
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
c.getRowLength() - ROW.length)));
return listOfRowNumbers;
public static void setUpAfterClass() throws Exception {
private void moveRegion(Table table, int index) throws IOException {
List<Pair<HRegionInfo, ServerName>> regions =
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
assertEquals(1, regions.size());
HRegionInfo regionInfo = regions.get(0).getFirst();
ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
.move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
try {
Thread.sleep(5000L); // wait to complete
} catch (InterruptedException e) {
private void balanceTwoRegions(Table table) throws Exception {
List<Pair<HRegionInfo, ServerName>> regions =
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
assertEquals(2, regions.size());
HRegionInfo regionInfo1 = regions.get(0).getFirst();
ServerName name1 = utility1.getHBaseCluster().getRegionServer(0).getServerName();
HRegionInfo regionInfo2 = regions.get(1).getFirst();
ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
.move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
.move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
private void assertIntegerList(List<Integer> list, int start, int step) {
int size = list.size();
for (int i = 0; i < size; i++) {
assertTrue(list.get(i) == start + step * i);