HBASE-9465 Push entries to peer clusters serially

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
Phil Yang 2016-08-03 19:37:27 +08:00 committed by zhangduo
parent 25c4ff5404
commit 441bc050b9
21 changed files with 1193 additions and 75 deletions

View File

@ -34,13 +34,12 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Matcher;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.WritableComparable;
@ -1215,6 +1215,18 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
return Collections.unmodifiableCollection(this.families.values());
}
/**
* Return true if there are at least one cf whose replication scope is serial.
*/
public boolean hasSerialReplicationScope() {
for (HColumnDescriptor column: getFamilies()){
if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL){
return true;
}
}
return false;
}
/**
* Returns the configured replicas per region
*/
@ -1759,7 +1771,31 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.setCacheDataInL1(true)
.setCacheDataInL1(true),
new HColumnDescriptor(HConstants.REPLICATION_BARRIER_FAMILY)
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
HConstants.DEFAULT_HBASE_META_VERSIONS))
.setInMemory(true)
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
// Enable cache of data blocks in L1 if more than one caching tier deployed:
// e.g. if using CombinedBlockCache (BucketCache).
.setCacheDataInL1(true),
new HColumnDescriptor(HConstants.REPLICATION_POSITION_FAMILY)
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
HConstants.DEFAULT_HBASE_META_VERSIONS))
.setInMemory(true)
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
// Enable cache of data blocks in L1 if more than one caching tier deployed:
// e.g. if using CombinedBlockCache (BucketCache).
.setCacheDataInL1(true),
});
metaDescriptor.addCoprocessor(
"org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",

View File

@ -20,6 +20,20 @@ package org.apache.hadoop.hbase;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -49,18 +63,6 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Read/write operations on region and assignment information store in
* <code>hbase:meta</code>.
@ -107,10 +109,27 @@ public class MetaTableAccessor {
*
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
* and should not leak out of it (through Result objects, etc)
*
* For replication serially, there are two column families "rep_barrier", "rep_position" whose
* row key is encodedRegionName.
* rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
* id in this region
* rep_position:{peerid} => to save the max sequence id we have pushed for each peer
* rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
* we clean old data
* rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this
* cell the value is merged encoded name or two split encoded names
* separated by ","
*/
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
// Save its daughter region(s) when split/merge
private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
// Save its table name because we only know region's encoded name
private static final String tableNamePeer = "_TABLENAME_";
private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
static final byte [] META_REGION_PREFIX;
static {
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
@ -962,6 +981,19 @@ public class MetaTableAccessor {
return delete;
}
public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName) {
byte[] seqBytes = Bytes.toBytes(seq);
return new Put(encodedRegionName)
.addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
}
public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
daughterNamePosCq, value);
}
/**
* Adds split daughters to the Put
*/
@ -978,24 +1010,24 @@ public class MetaTableAccessor {
}
/**
* Put the passed <code>p</code> to the <code>hbase:meta</code> table.
* Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
* Non-atomic for multi puts.
* @param connection connection we're using
* @param p Put to add to hbase:meta
* @param puts Put to add to hbase:meta
* @throws IOException
*/
static void putToMetaTable(final Connection connection, final Put p)
throws IOException {
put(getMetaHTable(connection), p);
static void putToMetaTable(final Connection connection, final Put... puts) throws IOException {
put(getMetaHTable(connection), Arrays.asList(puts));
}
/**
* @param t Table to use (will be closed when done).
* @param p put to make
* @param puts puts to make
* @throws IOException
*/
private static void put(final Table t, final Put p) throws IOException {
private static void put(final Table t, final List<Put> puts) throws IOException {
try {
t.put(p);
t.put(puts);
} finally {
t.close();
}
@ -1121,7 +1153,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
* if you want to do that.
* @param meta the Table for META
* @param regionInfo region information
@ -1143,7 +1175,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName, int)}
* {@link #splitRegion(Connection, HRegionInfo, HRegionInfo, HRegionInfo, ServerName,int,boolean)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo region information
@ -1232,7 +1264,7 @@ public class MetaTableAccessor {
*/
public static void mergeRegions(final Connection connection, HRegionInfo mergedRegion,
HRegionInfo regionA, HRegionInfo regionB, ServerName sn, int regionReplication,
long masterSystemTime)
long masterSystemTime, boolean saveBarrier)
throws IOException {
Table meta = getMetaHTable(connection);
try {
@ -1263,7 +1295,17 @@ public class MetaTableAccessor {
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
multiMutate(meta, tableRow, putOfMerged, deleteA, deleteB);
Mutation[] mutations;
if (saveBarrier) {
Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
Bytes.toBytes(mergedRegion.getEncodedName()));
Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
Bytes.toBytes(mergedRegion.getEncodedName()));
mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
} else {
mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
}
multiMutate(meta, tableRow, mutations);
} finally {
meta.close();
}
@ -1279,10 +1321,11 @@ public class MetaTableAccessor {
* @param splitA Split daughter region A
* @param splitB Split daughter region A
* @param sn the location of the region
* @param saveBarrier true if need save replication barrier in meta, used for serial replication
*/
public static void splitRegion(final Connection connection,
HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB,
ServerName sn, int regionReplication) throws IOException {
public static void splitRegion(final Connection connection, HRegionInfo parent,
HRegionInfo splitA, HRegionInfo splitB, ServerName sn, int regionReplication,
boolean saveBarrier) throws IOException {
Table meta = getMetaHTable(connection);
try {
HRegionInfo copyOfParent = new HRegionInfo(parent);
@ -1307,8 +1350,17 @@ public class MetaTableAccessor {
addEmptyLocation(putB, i);
}
Mutation[] mutations;
if (saveBarrier) {
Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
Bytes
.toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
mutations = new Mutation[]{putParent, putA, putB, putBarrier};
} else {
mutations = new Mutation[]{putParent, putA, putB};
}
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(meta, tableRow, putParent, putA, putB);
multiMutate(meta, tableRow, mutations);
} finally {
meta.close();
}
@ -1365,6 +1417,27 @@ public class MetaTableAccessor {
updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
}
/**
* Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
* @param connection connection we're using
* @param peerId the peerId to push
* @param positions map that saving positions for each region
* @throws IOException
*/
public static void updateReplicationPositions(Connection connection, String peerId,
Map<String, Long> positions) throws IOException {
List<Put> puts = new ArrayList<>();
for (Map.Entry<String, Long> entry : positions.entrySet()) {
long value = Math.abs(entry.getValue());
Put put = new Put(Bytes.toBytes(entry.getKey()));
put.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId),
Bytes.toBytes(value));
puts.add(put);
}
getMetaHTable(connection).put(puts);
}
/**
* Updates the location of the specified region to be the specified server.
* <p>
@ -1534,4 +1607,119 @@ public class MetaTableAccessor {
p.addImmutable(HConstants.CATALOG_FAMILY, getSeqNumColumn(replicaId), now, null);
return p;
}
/**
* Get replication position for a peer in a region.
* @param connection connection we're using
* @return the position of this peer, -1 if no position in meta.
*/
public static long getReplicationPositionForOnePeer(Connection connection,
byte[] encodedRegionName, String peerId) throws IOException {
Get get = new Get(encodedRegionName);
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
Result r = get(getMetaHTable(connection), get);
if (r.isEmpty()) {
return -1;
}
Cell cell = r.rawCells()[0];
return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
}
/**
* Get replication positions for all peers in a region.
* @param connection connection we're using
* @param encodedRegionName region's encoded name
* @return the map of positions for each peer
*/
public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
byte[] encodedRegionName) throws IOException {
Get get = new Get(encodedRegionName);
get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
Result r = get(getMetaHTable(connection), get);
Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
for (Cell c : r.listCells()) {
if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
c.getQualifierOffset(), c.getQualifierLength()) &&
!Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
c.getQualifierOffset(), c.getQualifierLength())) {
map.put(
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
}
}
return map;
}
/**
* Get all barriers in all regions.
* @return a map of barrier lists in all regions
* @throws IOException
*/
public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
throws IOException {
Get get = new Get(encodedRegionName);
get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
Result r = get(getMetaHTable(connection), get);
List<Long> list = new ArrayList<>();
if (!r.isEmpty()) {
for (Cell cell : r.rawCells()) {
list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength()));
}
}
return list;
}
public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
Map<String, List<Long>> map = new HashMap<>();
Scan scan = new Scan();
scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
try (Table t = getMetaHTable(connection);
ResultScanner scanner = t.getScanner(scan)) {
Result result;
while ((result = scanner.next()) != null) {
String key = Bytes.toString(result.getRow());
List<Long> list = new ArrayList<>();
for (Cell cell : result.rawCells()) {
list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength()));
}
map.put(key, list);
}
}
return map;
}
/**
* Get daughter region(s) for a region, only used in serial replication.
* @throws IOException
*/
public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
throws IOException {
Get get = new Get(encodedName);
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
Result result = get(getMetaHTable(connection), get);
if (!result.isEmpty()) {
Cell c = result.rawCells()[0];
return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
return null;
}
/**
* Get the table name for a region, only used in serial replication.
* @throws IOException
*/
public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
throws IOException {
Get get = new Get(encodedName);
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
Result result = get(getMetaHTable(connection), get);
if (!result.isEmpty()) {
Cell c = result.rawCells()[0];
return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
return null;
}
}

View File

@ -92,8 +92,10 @@ public class ReplicationAdmin implements Closeable {
// only Global for now, can add other type
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
public static final String REPLICATIONTYPE = "replicationType";
public static final String REPLICATIONGLOBAL = Integer
.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
public static final String REPLICATIONGLOBAL =
Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
public static final String REPLICATIONSERIAL =
Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
private final Connection connection;
// TODO: replication should be managed by master. All the classes except ReplicationAdmin should
@ -517,7 +519,10 @@ public class ReplicationAdmin implements Closeable {
HashMap<String, String> replicationEntry = new HashMap<String, String>();
replicationEntry.put(TNAME, tableName);
replicationEntry.put(CFNAME, column.getNameAsString());
replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
replicationEntry.put(REPLICATIONTYPE,
column.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL ?
REPLICATIONGLOBAL :
REPLICATIONSERIAL);
replicationColFams.add(replicationEntry);
}
}
@ -712,7 +717,8 @@ public class ReplicationAdmin implements Closeable {
*/
private boolean isTableRepEnabled(HTableDescriptor htd) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
&& hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
return false;
}
}

View File

@ -426,6 +426,20 @@ public final class HConstants {
/** The catalog family */
public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
/** The replication barrier family as a string*/
public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
/** The replication barrier family */
public static final byte [] REPLICATION_BARRIER_FAMILY =
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
/** The replication barrier family as a string*/
public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
/** The replication barrier family */
public static final byte [] REPLICATION_POSITION_FAMILY =
Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
/** The RegionInfo qualifier as a string */
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
@ -620,6 +634,12 @@ public final class HConstants {
*/
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
* Scope tag for serially scoped data
* This data will be replicated to all peers by the order of sequence id.
*/
public static final int REPLICATION_SCOPE_SERIAL = 2;
/**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
@ -854,6 +874,12 @@ public final class HConstants {
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
public static final String
REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
public static final long
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
/**
* Directory where the source cluster file system client configuration are placed which is used by
* sink cluster to copy HFiles from source cluster file system

View File

@ -1466,6 +1466,20 @@ possible configurations would overwhelm and obscure the important.
slave clusters. The default of 10 will rarely need to be changed.
</description>
</property>
<property>
<name>hbase.serial.replication.waitingMs</name>
<value>10000</value>
<description>
By default, in replication we can not make sure the order of operations in slave cluster is
same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
of written. This configure is to set how long (in ms) we will wait before next checking if a
log can not push right now because there are some logs written before it have not been pushed.
A larger waiting will decrease the number of queries on hbase:meta but will enlarge the delay
of replication. This feature relies on zk-less assignment, and conflicts with distributed log
replay. So users must set hbase.assignment.usezk and hbase.master.distributed.log.replay to
false to support it.
</description>
</property>
<!-- Static Web User Filter properties. -->
<property>
<description>

View File

@ -21,6 +21,10 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
*/
REPLICATION_SCOPE_GLOBAL(1, 1),
/**
* <code>REPLICATION_SCOPE_SERIAL = 2;</code>
*/
REPLICATION_SCOPE_SERIAL(2, 2),
;
/**
@ -31,6 +35,10 @@ public final class WALProtos {
* <code>REPLICATION_SCOPE_GLOBAL = 1;</code>
*/
public static final int REPLICATION_SCOPE_GLOBAL_VALUE = 1;
/**
* <code>REPLICATION_SCOPE_SERIAL = 2;</code>
*/
public static final int REPLICATION_SCOPE_SERIAL_VALUE = 2;
public final int getNumber() { return value; }
@ -39,6 +47,7 @@ public final class WALProtos {
switch (value) {
case 0: return REPLICATION_SCOPE_LOCAL;
case 1: return REPLICATION_SCOPE_GLOBAL;
case 2: return REPLICATION_SCOPE_SERIAL;
default: return null;
}
}
@ -12014,10 +12023,11 @@ public final class WALProtos {
"criptor\022$\n\006server\030\006 \001(\0132\024.hbase.pb.Serve" +
"rName\022\023\n\013region_name\030\007 \001(\014\".\n\tEventType\022" +
"\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWA" +
"LTrailer*F\n\tScopeType\022\033\n\027REPLICATION_SCO" +
"LTrailer*d\n\tScopeType\022\033\n\027REPLICATION_SCO" +
"PE_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001" +
"B?\n*org.apache.hadoop.hbase.protobuf.gen" +
"eratedB\tWALProtosH\001\210\001\000\240\001\001"
"\022\034\n\030REPLICATION_SCOPE_SERIAL\020\002B?\n*org.ap" +
"ache.hadoop.hbase.protobuf.generatedB\tWA" +
"LProtosH\001\210\001\000\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View File

@ -76,6 +76,7 @@ message WALKey {
enum ScopeType {
REPLICATION_SCOPE_LOCAL = 0;
REPLICATION_SCOPE_GLOBAL = 1;
REPLICATION_SCOPE_SERIAL = 2;
}
message FamilyScope {

View File

@ -97,6 +97,7 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
@ -315,6 +316,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
CatalogJanitor catalogJanitorChore;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
private ReplicationMetaCleaner replicationMetaCleaner;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
@ -1160,6 +1162,12 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
LOG.error("start replicationZKLockCleanerChore failed", e);
}
}
try {
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
getChoreService().scheduleChore(replicationMetaCleaner);
} catch (Exception e) {
LOG.error("start ReplicationMetaCleaner failed", e);
}
}
@Override
@ -1194,6 +1202,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();

View File

@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hbase.master;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -45,8 +49,6 @@ import org.apache.hadoop.hbase.util.MultiHConnection;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Preconditions;
/**
* A helper to persist region state in meta. We may change this class
* to StateStore later if we also use it to store other states in meta
@ -63,7 +65,7 @@ public class RegionStateStore {
private volatile boolean initialized;
private final boolean noPersistence;
private final Server server;
private final MasterServices server;
/**
* Returns the {@link ServerName} from catalog table {@link Result}
@ -133,7 +135,7 @@ public class RegionStateStore {
State.SPLITTING_NEW, State.MERGED));
}
RegionStateStore(final Server server) {
RegionStateStore(final MasterServices server) {
Configuration conf = server.getConfiguration();
// No need to persist if using ZK but not migrating
noPersistence = ConfigUtil.useZKForAssignment(conf)
@ -198,31 +200,41 @@ public class RegionStateStore {
State state = newState.getState();
int replicaId = hri.getReplicaId();
Put put = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
Put metaPut = new Put(MetaTableAccessor.getMetaKeyForRegion(hri));
StringBuilder info = new StringBuilder("Updating hbase:meta row ");
info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
if (serverName != null && !serverName.equals(oldServer)) {
put.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
metaPut.addImmutable(HConstants.CATALOG_FAMILY, getServerNameColumn(replicaId),
Bytes.toBytes(serverName.getServerName()));
info.append(", sn=").append(serverName);
}
if (openSeqNum >= 0) {
Preconditions.checkArgument(state == State.OPEN
&& serverName != null, "Open region should be on a server");
MetaTableAccessor.addLocation(put, serverName, openSeqNum, -1, replicaId);
MetaTableAccessor.addLocation(metaPut, serverName, openSeqNum, -1, replicaId);
info.append(", openSeqNum=").append(openSeqNum);
info.append(", server=").append(serverName);
}
put.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
metaPut.addImmutable(HConstants.CATALOG_FAMILY, getStateColumn(replicaId),
Bytes.toBytes(state.name()));
LOG.info(info);
HTableDescriptor descriptor = server.getTableDescriptors().get(hri.getTable());
boolean serial = false;
if (descriptor != null) {
serial = server.getTableDescriptors().get(hri.getTable()).hasSerialReplicationScope();
}
boolean shouldPutBarrier = serial && state == State.OPEN;
// Persist the state change to meta
if (metaRegion != null) {
try {
// Assume meta is pinned to master.
// At least, that's what we want.
metaRegion.put(put);
metaRegion.put(metaPut);
if (shouldPutBarrier) {
Put barrierPut = MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
openSeqNum, hri.getTable().getName());
metaRegion.put(barrierPut);
}
return; // Done here
} catch (Throwable t) {
// In unit tests, meta could be moved away by intention
@ -241,7 +253,10 @@ public class RegionStateStore {
}
}
// Called when meta is not on master
multiHConnection.processBatchCallback(Arrays.asList(put), TableName.META_TABLE_NAME, null, null);
List<Put> list = shouldPutBarrier ?
Arrays.asList(metaPut, MetaTableAccessor.makeBarrierPut(hri.getEncodedNameAsBytes(),
openSeqNum, hri.getTable().getName())) : Arrays.asList(metaPut);
multiHConnection.processBatchCallback(list, TableName.META_TABLE_NAME, null, null);
} catch (IOException ioe) {
LOG.error("Failed to persist region state " + newState, ioe);
@ -251,12 +266,14 @@ public class RegionStateStore {
void splitRegion(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication);
MetaTableAccessor.splitRegion(server.getConnection(), p, a, b, sn, regionReplication,
server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
}
void mergeRegions(HRegionInfo p,
HRegionInfo a, HRegionInfo b, ServerName sn, int regionReplication) throws IOException {
MetaTableAccessor.mergeRegions(server.getConnection(), p, a, b, sn, regionReplication,
EnvironmentEdgeManager.currentTime());
EnvironmentEdgeManager.currentTime(),
server.getTableDescriptors().get(p.getTable()).hasSerialReplicationScope());
}
}

View File

@ -0,0 +1,187 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This chore is to clean up the useless data in hbase:meta which is used by serial replication.
*/
@InterfaceAudience.Private
public class ReplicationMetaCleaner extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
private ReplicationAdmin replicationAdmin;
private MasterServices master;
public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
throws IOException {
super("ReplicationMetaCleaner", stoppable, period);
this.master = master;
replicationAdmin = new ReplicationAdmin(master.getConfiguration());
}
@Override
protected void chore() {
try {
Map<String, HTableDescriptor> tables = master.getTableDescriptors().getAll();
Map<String, Set<String>> serialTables = new HashMap<>();
for (Map.Entry<String, HTableDescriptor> entry : tables.entrySet()) {
boolean hasSerialScope = false;
for (HColumnDescriptor column : entry.getValue().getFamilies()) {
if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
hasSerialScope = true;
break;
}
}
if (hasSerialScope) {
serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<String>());
}
}
if (serialTables.isEmpty()){
return;
}
Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
for (Map.Entry<byte[], byte[]> map : entry.getValue().getPeerData()
.entrySet()) {
String tableName = Bytes.toString(map.getKey());
if (serialTables.containsKey(tableName)) {
serialTables.get(tableName).add(entry.getKey());
break;
}
}
}
Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
String encodedName = entry.getKey();
byte[] encodedBytes = Bytes.toBytes(encodedName);
boolean canClearRegion = false;
Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
master.getConnection(), encodedBytes);
if (posMap.isEmpty()) {
continue;
}
String tableName = MetaTableAccessor.getSerialReplicationTableName(
master.getConnection(), encodedBytes);
Set<String> confPeers = serialTables.get(tableName);
if (confPeers == null) {
// This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
canClearRegion = true;
} else {
if (!allPeersHavePosition(confPeers, posMap)) {
continue;
}
String daughterValue = MetaTableAccessor
.getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
if (daughterValue != null) {
//this region is merged or split
boolean allDaughterStart = true;
String[] daughterRegions = daughterValue.split(",");
for (String daughter : daughterRegions) {
byte[] region = Bytes.toBytes(daughter);
if (!MetaTableAccessor.getReplicationBarriers(
master.getConnection(), region).isEmpty() &&
!allPeersHavePosition(confPeers,
MetaTableAccessor
.getReplicationPositionForAllPeer(master.getConnection(), region))) {
allDaughterStart = false;
break;
}
}
if (allDaughterStart) {
canClearRegion = true;
}
}
}
if (canClearRegion) {
Delete delete = new Delete(encodedBytes);
delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
metaTable.delete(delete);
}
} else {
// Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
// is smaller than min pos should be kept. All other barriers can be deleted.
long minPos = Long.MAX_VALUE;
for (Map.Entry<String, Long> pos : posMap.entrySet()) {
minPos = Math.min(minPos, pos.getValue());
}
List<Long> barriers = entry.getValue();
int index = Collections.binarySearch(barriers, minPos);
if (index < 0) {
index = -index - 1;
}
Delete delete = new Delete(encodedBytes);
for (int i = 0; i < index - 1; i++) {
delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
}
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
metaTable.delete(delete);
}
}
}
} catch (IOException e) {
LOG.error("Exception during cleaning up.", e);
}
}
private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
throws IOException {
for(String peer:peers){
if (!posMap.containsKey(peer)){
return false;
}
}
return true;
}
}

View File

@ -434,7 +434,8 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
if (metaEntries.isEmpty()) {
MetaTableAccessor.mergeRegions(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),
server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime);
server.getServerName(), region_a.getTableDesc().getRegionReplication(), masterSystemTime,
false);
} else {
mergeRegionsAndPutMetaEntries(server.getConnection(),
mergedRegion.getRegionInfo(), region_a.getRegionInfo(), region_b.getRegionInfo(),

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

View File

@ -337,7 +337,7 @@ public class SplitTransactionImpl implements SplitTransaction {
MetaTableAccessor.splitRegion(server.getConnection(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(),
daughterRegions.getSecond().getRegionInfo(), server.getServerName(),
parent.getTableDesc().getRegionReplication());
parent.getTableDesc().getRegionReplication(), false);
} else {
offlineParentInMetaAndputMetaEntries(server.getConnection(),
parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server;
@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@ -311,8 +313,19 @@ public class Replication extends WALActionsListener.Base implements
if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
} else {
// Skip the flush/compaction/region events
WALProtos.RegionEventDescriptor maybeEvent = WALEdit.getRegionEventDescriptor(cell);
if (maybeEvent != null && (maybeEvent.getEventType() ==
WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
for (HColumnDescriptor cf :htd.getFamilies()) {
if (cf.getScope() != REPLICATION_SCOPE_LOCAL) {
scopes.put(cf.getName(), cf.getScope());
}
}
}
// Skip the flush/compaction
continue;
}
} else {
family = CellUtil.cloneFamily(cell);

View File

@ -18,6 +18,13 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -44,9 +51,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -68,10 +77,6 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
@ -102,6 +107,8 @@ public class ReplicationSource extends Thread
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
String actualPeerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
@ -185,6 +192,8 @@ public class ReplicationSource extends Thread
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
this.actualPeerId = replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
}
@ -507,6 +516,17 @@ public class ReplicationSource extends Thread
// Current number of hfiles that we need to replicate
private long currentNbHFiles = 0;
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(
new CacheLoader<String, Boolean>() {
@Override
public Boolean load(String key) throws Exception {
return false;
}
}
);
public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
this.walGroupId = walGroupId;
@ -587,9 +607,24 @@ public class ReplicationSource extends Thread
currentNbOperations = 0;
currentNbHFiles = 0;
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
Map<String, Long> lastPositionsForSerialScope = new HashMap<>();
currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries,
lastPositionsForSerialScope)) {
for (Map.Entry<String, Long> entry : lastPositionsForSerialScope.entrySet()) {
waitingUntilCanPush(entry);
}
try {
MetaTableAccessor
.updateReplicationPositions(manager.getConnection(), actualPeerId,
lastPositionsForSerialScope);
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
}
continue;
}
} catch (IOException ioe) {
@ -625,15 +660,30 @@ public class ReplicationSource extends Thread
LOG.warn("Unable to finalize the tailing of a file", e);
}
}
for(Map.Entry<String, Long> entry: lastPositionsForSerialScope.entrySet()) {
waitingUntilCanPush(entry);
}
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
manager.logPositionAndCleanOldLogs(this.currentPath,
peerClusterZnode, this.repLogReader.getPosition(),
// Save positions to meta table before zk.
if (!gotIOE) {
try {
MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
lastPositionsForSerialScope);
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
}
}
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(),
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
// Reset the sleep multiplier if nothing has actually gone wrong
@ -648,8 +698,7 @@ public class ReplicationSource extends Thread
}
continue;
}
sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries);
shipEdits(currentWALisBeingWrittenTo, entries, lastPositionsForSerialScope);
}
if (replicationQueueInfo.isQueueRecovered()) {
// use synchronize to make sure one last thread will clean the queue
@ -671,16 +720,42 @@ public class ReplicationSource extends Thread
}
}
private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
String key = entry.getKey();
long seq = entry.getValue();
boolean deleteKey = false;
if (seq <= 0) {
// There is a REGION_CLOSE marker, we can not continue skipping after this entry.
deleteKey = true;
seq = -seq;
}
if (!canSkipWaitingSet.getUnchecked(key)) {
try {
manager.waitUntilCanBePushed(Bytes.toBytes(key), seq, actualPeerId);
} catch (Exception e) {
LOG.error("waitUntilCanBePushed fail", e);
stopper.stop("waitUntilCanBePushed fail");
}
canSkipWaitingSet.put(key, true);
}
if (deleteKey) {
canSkipWaitingSet.invalidate(key);
}
}
/**
* Read all the entries from the current log files and retain those that need to be replicated.
* Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to
* @param entries resulting entries to be replicated
* @param lastPosition save the last sequenceid for each region if the table has
* serial-replication scope
* @return true if we got nothing and went to the next file, false if we got entries
* @throws IOException
*/
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
List<WAL.Entry> entries) throws IOException {
List<WAL.Entry> entries, Map<String, Long> lastPosition) throws IOException {
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
@ -693,6 +768,26 @@ public class ReplicationSource extends Thread
metrics.incrLogEditsRead();
seenEntries++;
if (entry.hasSerialReplicationScope()) {
String key = Bytes.toString(entry.getKey().getEncodedRegionName());
lastPosition.put(key, entry.getKey().getLogSeqNum());
if (entry.getEdit().getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
if (maybeEvent != null && maybeEvent.getEventType()
== WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
// In serially replication, if we move a region to another RS and move it back, we may
// read logs crossing two sections. We should break at REGION_CLOSE and push the first
// section first in case of missing the middle section belonging to the other RS.
// In a worker thread, if we can push the first log of a region, we can push all logs
// in the same region without waiting until we read a close marker because next time
// we read logs in this region, it must be a new section and not adjacent with this
// region. Mark it negative.
lastPosition.put(key, -entry.getKey().getLogSeqNum());
break;
}
}
}
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
@ -722,6 +817,7 @@ public class ReplicationSource extends Thread
|| entries.size() >= replicationQueueNbCapacity) {
break;
}
try {
entry = this.repLogReader.readNextAndSetPosition();
} catch (IOException ie) {
@ -996,7 +1092,8 @@ public class ReplicationSource extends Thread
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
*/
protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries,
Map<String, Long> lastPositionsForSerialScope) {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
LOG.warn("Was given 0 edits to ship");
@ -1047,6 +1144,16 @@ public class ReplicationSource extends Thread
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
// Save positions to meta table before zk.
try {
MetaTableAccessor.updateReplicationPositions(manager.getConnection(), actualPeerId,
lastPositionsForSerialScope);
} catch (IOException e) {
LOG.error("updateReplicationPositions fail", e);
stopper.stop("updateReplicationPositions fail");
}
//Log and clean up WAL logs
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),

View File

@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -48,10 +49,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@ -64,6 +68,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
/**
@ -118,6 +123,8 @@ public class ReplicationSourceManager implements ReplicationListener {
private final Random rand;
private final boolean replicationForBulkLoadDataEnabled;
private Connection connection;
private long replicationWaitTime;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
@ -134,7 +141,7 @@ public class ReplicationSourceManager implements ReplicationListener {
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
final Configuration conf, final Server server, final FileSystem fs, final Path logDir,
final Path oldLogDir, final UUID clusterId) {
final Path oldLogDir, final UUID clusterId) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
@ -171,6 +178,9 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationForBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
connection = ConnectionFactory.createConnection(conf);
}
/**
@ -778,6 +788,10 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
public Connection getConnection() {
return this.connection;
}
/**
* Get a string representation of all the sources' metrics
*/
@ -804,4 +818,75 @@ public class ReplicationSourceManager implements ReplicationListener {
public void cleanUpHFileRefs(String peerId, List<String> files) {
this.replicationQueues.removeHFileRefs(peerId, files);
}
/**
* Whether an entry can be pushed to the peer or not right now.
* If we enable serial replication, we can not push the entry until all entries in its region
* whose sequence numbers are smaller than this entry have been pushed.
* For each ReplicationSource, we need only check the first entry in each region, as long as it
* can be pushed, we can push all in this ReplicationSource.
* This method will be blocked until we can push.
* @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
* prevent saving positions in the region of no barrier.
*/
void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
throws IOException, InterruptedException {
/**
* There are barriers for this region and position for this peer. N barriers form N intervals,
* (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
* the first barrier and the last interval is start from the last barrier.
*
* There are several conditions that we can push now, otherwise we should block:
* 1) "Serial replication" is not enabled, we can push all logs just like before. This case
* should not call this method.
* 2) There is no barriers for this region, or the seq id is smaller than the first barrier.
* It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
* order of logs that is written before altering.
* 3) This entry is in the first interval of barriers. We can push them because it is the
* start of a region. Splitting/merging regions are also ok because the first section of
* daughter region is in same region of parents and the order in one RS is guaranteed.
* 4) If the entry's seq id and the position are in same section, or the pos is the last
* number of previous section. Because when open a region we put a barrier the number
* is the last log's id + 1.
* 5) Log's seq is smaller than pos in meta, we are retrying. It may happen when a RS crashes
* after save replication meta and before save zk offset.
*/
List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
if (barriers.isEmpty() || seq <= barriers.get(0)) {
// Case 2
return;
}
int interval = Collections.binarySearch(barriers, seq);
if (interval < 0) {
interval = -interval - 1;// get the insert position if negative
}
if (interval == 1) {
// Case 3
return;
}
while (true) {
long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
if (seq <= pos) {
// Case 5
}
if (pos >= 0) {
// Case 4
int posInterval = Collections.binarySearch(barriers, pos);
if (posInterval < 0) {
posInterval = -posInterval - 1;// get the insert position if negative
}
if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
return;
}
}
LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId
+ " because previous log has not been pushed: sequence=" + seq + " pos=" + pos
+ " barriers=" + Arrays.toString(barriers.toArray()));
Thread.sleep(replicationWaitTime);
}
}
}

View File

@ -21,8 +21,11 @@ package org.apache.hadoop.hbase.wal;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -35,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.annotations.VisibleForTesting;
@ -269,6 +273,18 @@ public interface WAL {
key.setCompressionContext(compressionContext);
}
public boolean hasSerialReplicationScope () {
if (getKey().getScopes() == null || getKey().getScopes().isEmpty()) {
return false;
}
for (Map.Entry<byte[], Integer> e:getKey().getScopes().entrySet()) {
if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
return true;
}
}
return false;
}
@Override
public String toString() {
return this.key + "=" + this.edit;

View File

@ -443,7 +443,7 @@ public class TestMetaTableAccessor {
List<HRegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@ -472,7 +472,7 @@ public class TestMetaTableAccessor {
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
HConstants.LATEST_TIMESTAMP);
HConstants.LATEST_TIMESTAMP, false);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@ -556,7 +556,7 @@ public class TestMetaTableAccessor {
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
regionInfoA, regionInfoB, sn, 1, masterSystemTime);
regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
result = meta.get(get);
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
@ -639,7 +639,7 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}

View File

@ -165,7 +165,7 @@ public class TestMetaScanner {
end);
MetaTableAccessor.splitRegion(connection,
parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1);
parent, splita, splitb, ServerName.valueOf("fooserver", 1, 0), 1, false);
Threads.sleep(random.nextInt(200));
} catch (Throwable e) {

View File

@ -1214,7 +1214,7 @@ public class TestAssignmentManagerOnCluster {
}
conf.setInt("hbase.regionstatestore.meta.connection", 3);
final RegionStateStore rss =
new RegionStateStore(new MyRegionServer(conf, new ZkCoordinatedStateManager()));
new RegionStateStore(new MyMaster(conf, new ZkCoordinatedStateManager()));
rss.start();
// Create 10 threads and make each do 10 puts related to region state update
Thread[] th = new Thread[10];

View File

@ -0,0 +1,401 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSerialReplication {
private static final Log LOG = LogFactory.getLog(TestSerialReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] VALUE = Bytes.toBytes("v");
private static final byte[] ROW = Bytes.toBytes("r");
private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
conf1.setInt("replication.source.size.capacity", 1024);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes
conf1.setLong("replication.source.size.capacity", 1L);
conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L);
conf1.setBoolean("hbase.assignment.usezk", false);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZooKeeperWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZooKeeperWatcher(conf2, "cluster2", null, true);
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
admin1.addPeer("1", rpc, null);
utility1.startMiniCluster(1, 3);
utility2.startMiniCluster(1, 1);
utility1.getHBaseAdmin().setBalancerRunning(false, true);
}
@Test
public void testRegionMoveAndFailover() throws Exception {
TableName tableName = TableName.valueOf("testRSFailover");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
table.addFamily(fam);
utility1.getHBaseAdmin().createTable(table);
utility2.getHBaseAdmin().createTable(table);
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
LOG.info("move to 1");
moveRegion(t1, 1);
LOG.info("move to 0");
moveRegion(t1, 0);
for (int i = 10; i < 20; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
LOG.info("move to 2");
moveRegion(t1, 2);
for (int i = 20; i < 30; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
utility1.getHBaseCluster().abortRegionServer(2);
for (int i = 30; i < 40; i++) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 18000000) {
Scan scan = new Scan();
scan.setCaching(100);
List<Cell> list = new ArrayList<>();
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
assertEquals(1, result.rawCells().length);
list.add(result.rawCells()[0]);
}
}
List<Integer> listOfNumbers = getRowNumbers(list);
LOG.info(Arrays.toString(listOfNumbers.toArray()));
assertIntegerList(listOfNumbers, 10, 1);
if (listOfNumbers.size() != 30) {
LOG.info("Waiting all logs pushed to slave. Expected 30 , actual " + list.size());
Thread.sleep(200);
continue;
}
return;
}
throw new Exception("Not all logs have been pushed");
} finally {
utility1.getHBaseAdmin().disableTable(tableName);
utility2.getHBaseAdmin().disableTable(tableName);
utility1.getHBaseAdmin().deleteTable(tableName);
utility2.getHBaseAdmin().deleteTable(tableName);
}
}
@Test
public void testRegionSplit() throws Exception {
TableName tableName = TableName.valueOf("testRegionSplit");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
table.addFamily(fam);
utility1.getHBaseAdmin().createTable(table);
utility2.getHBaseAdmin().createTable(table);
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
for (int i = 10; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
Thread.sleep(5000L);
for (int i = 11; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
balanceTwoRegions(t1);
for (int i = 12; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
Scan scan = new Scan();
scan.setCaching(100);
List<Cell> list = new ArrayList<>();
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
assertEquals(1, result.rawCells().length);
list.add(result.rawCells()[0]);
}
}
List<Integer> listOfNumbers = getRowNumbers(list);
List<Integer> list0 = new ArrayList<>();
List<Integer> list1 = new ArrayList<>();
List<Integer> list21 = new ArrayList<>();
List<Integer> list22 = new ArrayList<>();
for (int num : listOfNumbers) {
if (num % 10 == 0) {
list0.add(num);
} else if (num % 10 == 1) {
list1.add(num);
} else if (num < 50) { //num%10==2
list21.add(num);
} else { // num%10==1&&num>50
list22.add(num);
}
}
LOG.info(Arrays.toString(list0.toArray()));
LOG.info(Arrays.toString(list1.toArray()));
LOG.info(Arrays.toString(list21.toArray()));
LOG.info(Arrays.toString(list22.toArray()));
assertIntegerList(list0, 10, 10);
assertIntegerList(list1, 11, 10);
assertIntegerList(list21, 12, 10);
assertIntegerList(list22, 52, 10);
if (!list1.isEmpty()) {
assertEquals(9, list0.size());
}
if (!list21.isEmpty() || !list22.isEmpty()) {
assertEquals(9, list1.size());
}
if (list.size() == 27) {
return;
}
LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
Thread.sleep(200);
}
throw new Exception("Not all logs have been pushed");
} finally {
utility1.getHBaseAdmin().disableTable(tableName);
utility2.getHBaseAdmin().disableTable(tableName);
utility1.getHBaseAdmin().deleteTable(tableName);
utility2.getHBaseAdmin().deleteTable(tableName);
}
}
@Test
public void testRegionMerge() throws Exception {
TableName tableName = TableName.valueOf("testRegionMerge");
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
table.addFamily(fam);
utility1.getHBaseAdmin().createTable(table);
utility2.getHBaseAdmin().createTable(table);
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
Thread.sleep(5000L);
try(Table t1 = utility1.getConnection().getTable(tableName);
Table t2 = utility2.getConnection().getTable(tableName)) {
for (int i = 10; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
List<Pair<HRegionInfo, ServerName>> regions =
MetaTableAccessor.getTableRegionsAndLocations(utility1.getZooKeeperWatcher(),
utility1.getConnection(), tableName);
assertEquals(2, regions.size());
utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(),
regions.get(1).getFirst().getRegionName(), true);
for (int i = 11; i < 100; i += 10) {
Put put = new Put(ROWS[i]);
put.addColumn(famName, VALUE, VALUE);
t1.put(put);
}
long start = EnvironmentEdgeManager.currentTime();
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
Scan scan = new Scan();
scan.setCaching(100);
List<Cell> list = new ArrayList<>();
try (ResultScanner results = t2.getScanner(scan)) {
for (Result result : results) {
assertEquals(1, result.rawCells().length);
list.add(result.rawCells()[0]);
}
}
List<Integer> listOfNumbers = getRowNumbers(list);
List<Integer> list0 = new ArrayList<>();
List<Integer> list1 = new ArrayList<>();
for (int num : listOfNumbers) {
if (num % 10 == 0) {
list0.add(num);
} else {
list1.add(num);
}
}
LOG.info(Arrays.toString(list0.toArray()));
LOG.info(Arrays.toString(list1.toArray()));
assertIntegerList(list0, 10, 10);
assertIntegerList(list1, 11, 10);
if (!list1.isEmpty()) {
assertEquals(9, list0.size());
}
if (list.size() == 18) {
return;
}
LOG.info("Waiting all logs pushed to slave. Expected 18 , actual " + list.size());
Thread.sleep(200);
}
} finally {
utility1.getHBaseAdmin().disableTable(tableName);
utility2.getHBaseAdmin().disableTable(tableName);
utility1.getHBaseAdmin().deleteTable(tableName);
utility2.getHBaseAdmin().deleteTable(tableName);
}
}
private List<Integer> getRowNumbers(List<Cell> cells) {
List<Integer> listOfRowNumbers = new ArrayList<>();
for (Cell c : cells) {
listOfRowNumbers.add(Integer.parseInt(Bytes
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
c.getRowLength() - ROW.length)));
}
return listOfRowNumbers;
}
@AfterClass
public static void setUpAfterClass() throws Exception {
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
private void moveRegion(Table table, int index) throws IOException {
List<Pair<HRegionInfo, ServerName>> regions =
MetaTableAccessor.getTableRegionsAndLocations(utility1.getZooKeeperWatcher(),
utility1.getConnection(), table.getName());
assertEquals(1, regions.size());
HRegionInfo regionInfo = regions.get(0).getFirst();
ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
utility1.getHBaseAdmin()
.move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
try {
Thread.sleep(5000L); // wait to complete
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void balanceTwoRegions(Table table) throws Exception {
List<Pair<HRegionInfo, ServerName>> regions =
MetaTableAccessor.getTableRegionsAndLocations(utility1.getZooKeeperWatcher(),
utility1.getConnection(), table.getName());
assertEquals(2, regions.size());
HRegionInfo regionInfo1 = regions.get(0).getFirst();
ServerName name1 = utility1.getHBaseCluster().getRegionServer(0).getServerName();
HRegionInfo regionInfo2 = regions.get(1).getFirst();
ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
utility1.getHBaseAdmin()
.move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
Thread.sleep(5000L);
utility1.getHBaseAdmin()
.move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
Thread.sleep(5000L);
}
private void assertIntegerList(List<Integer> list, int start, int step) {
for (int i = 0; i < list.size(); i++) {
assertTrue(list.get(i) == start + step * i);
}
}
}