HBASE-17010 Serial replication should handle daughter regions being assigned to another RS (Phil Yang)
This commit is contained in:
parent
d9316a64a9
commit
76814e8451
|
@ -123,27 +123,30 @@ public class MetaTableAccessor {
|
||||||
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
|
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
|
||||||
* and should not leak out of it (through Result objects, etc)
|
* and should not leak out of it (through Result objects, etc)
|
||||||
*
|
*
|
||||||
* For replication serially, there are two column families "rep_barrier", "rep_position" whose
|
* For replication serially, there are three column families "rep_barrier", "rep_position" and
|
||||||
* row key is encodedRegionName.
|
* "rep_meta" whose row key is encodedRegionName.
|
||||||
* rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
|
* rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
|
||||||
* id in this region
|
* id in this region
|
||||||
* rep_position:{peerid} => to save the max sequence id we have pushed for each peer
|
* rep_position:{peerid} => to save the max sequence id we have pushed for each peer
|
||||||
* rep_position:_TABLENAME_ => a special cell to save this region's table name, will used when
|
* rep_meta:_TABLENAME_ => a special cell to save this region's table name, will used when
|
||||||
* we clean old data
|
* we clean old data
|
||||||
* rep_position:_DAUGHTER_ => a special cell to present this region is split or merged, in this
|
* rep_meta:_DAUGHTER_ => a special cell to present this region is split or merged, in this
|
||||||
* cell the value is merged encoded name or two split encoded names
|
* cell the value is merged encoded name or two split encoded names
|
||||||
* separated by ","
|
* separated by ","
|
||||||
|
* rep_meta:_PARENT_ => a special cell to present this region's parent region(s), in this
|
||||||
|
* cell the value is encoded name of one or two parent regions
|
||||||
|
* separated by ","
|
||||||
*/
|
*/
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
|
private static final Log LOG = LogFactory.getLog(MetaTableAccessor.class);
|
||||||
private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META");
|
private static final Log METALOG = LogFactory.getLog("org.apache.hadoop.hbase.META");
|
||||||
|
|
||||||
// Save its daughter region(s) when split/merge
|
// Save its daughter/parent region(s) when split/merge
|
||||||
private static final byte[] daughterNamePosCq = Bytes.toBytes("_DAUGHTER_");
|
private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_");
|
||||||
|
private static final byte[] parentNameCq = Bytes.toBytes("_PARENT_");
|
||||||
|
|
||||||
// Save its table name because we only know region's encoded name
|
// Save its table name because we only know region's encoded name
|
||||||
private static final String tableNamePeer = "_TABLENAME_";
|
private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_");
|
||||||
private static final byte[] tableNamePosCq = Bytes.toBytes(tableNamePeer);
|
|
||||||
|
|
||||||
static final byte [] META_REGION_PREFIX;
|
static final byte [] META_REGION_PREFIX;
|
||||||
static {
|
static {
|
||||||
|
@ -1343,13 +1346,18 @@ public class MetaTableAccessor {
|
||||||
byte[] seqBytes = Bytes.toBytes(seq);
|
byte[] seqBytes = Bytes.toBytes(seq);
|
||||||
return new Put(encodedRegionName)
|
return new Put(encodedRegionName)
|
||||||
.addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
|
.addImmutable(HConstants.REPLICATION_BARRIER_FAMILY, seqBytes, seqBytes)
|
||||||
.addImmutable(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq, tableName);
|
.addImmutable(HConstants.REPLICATION_META_FAMILY, tableNameCq, tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static Put makeSerialDaughterPut(byte[] encodedRegionName, byte[] value) {
|
public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) {
|
||||||
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_POSITION_FAMILY,
|
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_META_FAMILY,
|
||||||
daughterNamePosCq, value);
|
daughterNameCq, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Put makeParentPut(byte[] encodedRegionName, byte[] value) {
|
||||||
|
return new Put(encodedRegionName).addImmutable(HConstants.REPLICATION_META_FAMILY,
|
||||||
|
parentNameCq, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1374,7 +1382,7 @@ public class MetaTableAccessor {
|
||||||
* @param puts Put to add to hbase:meta
|
* @param puts Put to add to hbase:meta
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
static void putToMetaTable(final Connection connection, final Put... puts)
|
public static void putToMetaTable(final Connection connection, final Put... puts)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
put(getMetaHTable(connection), Arrays.asList(puts));
|
put(getMetaHTable(connection), Arrays.asList(puts));
|
||||||
}
|
}
|
||||||
|
@ -1674,10 +1682,10 @@ public class MetaTableAccessor {
|
||||||
+ HConstants.DELIMITER);
|
+ HConstants.DELIMITER);
|
||||||
Mutation[] mutations;
|
Mutation[] mutations;
|
||||||
if (saveBarrier) {
|
if (saveBarrier) {
|
||||||
Put putBarrierA = makeSerialDaughterPut(regionA.getEncodedNameAsBytes(),
|
Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(),
|
||||||
Bytes.toBytes(mergedRegion.getEncodedName()));
|
mergedRegion.getEncodedNameAsBytes());
|
||||||
Put putBarrierB = makeSerialDaughterPut(regionB.getEncodedNameAsBytes(),
|
Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(),
|
||||||
Bytes.toBytes(mergedRegion.getEncodedName()));
|
mergedRegion.getEncodedNameAsBytes());
|
||||||
mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
|
mutations = new Mutation[] { putOfMerged, deleteA, deleteB, putBarrierA, putBarrierB };
|
||||||
} else {
|
} else {
|
||||||
mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
|
mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
|
||||||
|
@ -1729,10 +1737,14 @@ public class MetaTableAccessor {
|
||||||
|
|
||||||
Mutation[] mutations;
|
Mutation[] mutations;
|
||||||
if (saveBarrier) {
|
if (saveBarrier) {
|
||||||
Put putBarrier = makeSerialDaughterPut(parent.getEncodedNameAsBytes(),
|
Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(),
|
||||||
Bytes
|
Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName()));
|
||||||
.toBytes(splitA.getEncodedName() + HConstants.DELIMITER + splitB.getEncodedName()));
|
Put daughterPutA = makeParentPut(splitA.getEncodedNameAsBytes(),
|
||||||
mutations = new Mutation[]{putParent, putA, putB, putBarrier};
|
parent.getEncodedNameAsBytes());
|
||||||
|
Put daughterPutB = makeParentPut(splitB.getEncodedNameAsBytes(),
|
||||||
|
parent.getEncodedNameAsBytes());
|
||||||
|
|
||||||
|
mutations = new Mutation[]{putParent, putA, putB, parentPut, daughterPutA, daughterPutB};
|
||||||
} else {
|
} else {
|
||||||
mutations = new Mutation[]{putParent, putA, putB};
|
mutations = new Mutation[]{putParent, putA, putB};
|
||||||
}
|
}
|
||||||
|
@ -2112,14 +2124,9 @@ public class MetaTableAccessor {
|
||||||
Result r = get(getMetaHTable(connection), get);
|
Result r = get(getMetaHTable(connection), get);
|
||||||
Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
|
Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
|
||||||
for (Cell c : r.listCells()) {
|
for (Cell c : r.listCells()) {
|
||||||
if (!Bytes.equals(tableNamePosCq, 0, tableNamePosCq.length, c.getQualifierArray(),
|
map.put(
|
||||||
c.getQualifierOffset(), c.getQualifierLength()) &&
|
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
|
||||||
!Bytes.equals(daughterNamePosCq, 0, daughterNamePosCq.length, c.getQualifierArray(),
|
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
|
||||||
c.getQualifierOffset(), c.getQualifierLength())) {
|
|
||||||
map.put(
|
|
||||||
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
|
|
||||||
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
@ -2172,12 +2179,32 @@ public class MetaTableAccessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get daughter region(s) for a region, only used in serial replication.
|
* Get daughter region(s) for a region, only used in serial replication.
|
||||||
|
* @param connection connection we're using
|
||||||
|
* @param encodedName region's encoded name
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
|
public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Get get = new Get(encodedName);
|
Get get = new Get(encodedName);
|
||||||
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, daughterNamePosCq);
|
get.addColumn(HConstants.REPLICATION_META_FAMILY, daughterNameCq);
|
||||||
|
Result result = get(getMetaHTable(connection), get);
|
||||||
|
if (!result.isEmpty()) {
|
||||||
|
Cell c = result.rawCells()[0];
|
||||||
|
return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get parent region(s) for a region, only used in serial replication.
|
||||||
|
* @param connection connection we're using
|
||||||
|
* @param encodedName region's encoded name
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static String getSerialReplicationParentRegion(Connection connection, byte[] encodedName)
|
||||||
|
throws IOException {
|
||||||
|
Get get = new Get(encodedName);
|
||||||
|
get.addColumn(HConstants.REPLICATION_META_FAMILY, parentNameCq);
|
||||||
Result result = get(getMetaHTable(connection), get);
|
Result result = get(getMetaHTable(connection), get);
|
||||||
if (!result.isEmpty()) {
|
if (!result.isEmpty()) {
|
||||||
Cell c = result.rawCells()[0];
|
Cell c = result.rawCells()[0];
|
||||||
|
@ -2188,12 +2215,14 @@ public class MetaTableAccessor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the table name for a region, only used in serial replication.
|
* Get the table name for a region, only used in serial replication.
|
||||||
|
* @param connection connection we're using
|
||||||
|
* @param encodedName region's encoded name
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
|
public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Get get = new Get(encodedName);
|
Get get = new Get(encodedName);
|
||||||
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, tableNamePosCq);
|
get.addColumn(HConstants.REPLICATION_META_FAMILY, tableNameCq);
|
||||||
Result result = get(getMetaHTable(connection), get);
|
Result result = get(getMetaHTable(connection), get);
|
||||||
if (!result.isEmpty()) {
|
if (!result.isEmpty()) {
|
||||||
Cell c = result.rawCells()[0];
|
Cell c = result.rawCells()[0];
|
||||||
|
|
|
@ -436,13 +436,20 @@ public final class HConstants {
|
||||||
public static final byte [] REPLICATION_BARRIER_FAMILY =
|
public static final byte [] REPLICATION_BARRIER_FAMILY =
|
||||||
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
|
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
|
||||||
|
|
||||||
/** The replication barrier family as a string*/
|
/** The replication position family as a string*/
|
||||||
public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
|
public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
|
||||||
|
|
||||||
/** The replication barrier family */
|
/** The replication position family */
|
||||||
public static final byte [] REPLICATION_POSITION_FAMILY =
|
public static final byte [] REPLICATION_POSITION_FAMILY =
|
||||||
Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
|
Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
|
||||||
|
|
||||||
|
/** The replication meta family as a string*/
|
||||||
|
public static final String REPLICATION_META_FAMILY_STR = "rep_meta";
|
||||||
|
|
||||||
|
/** The replication meta family */
|
||||||
|
public static final byte [] REPLICATION_META_FAMILY =
|
||||||
|
Bytes.toBytes(REPLICATION_META_FAMILY_STR);
|
||||||
|
|
||||||
/** The RegionInfo qualifier as a string */
|
/** The RegionInfo qualifier as a string */
|
||||||
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
|
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
|
||||||
|
|
||||||
|
|
|
@ -140,6 +140,7 @@ public class ReplicationMetaCleaner extends ScheduledChore {
|
||||||
Delete delete = new Delete(encodedBytes);
|
Delete delete = new Delete(encodedBytes);
|
||||||
delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
|
delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
|
||||||
delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||||
|
delete.addFamily(HConstants.REPLICATION_META_FAMILY);
|
||||||
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
||||||
metaTable.delete(delete);
|
metaTable.delete(delete);
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||||
import org.apache.hadoop.hbase.client.NonceGenerator;
|
import org.apache.hadoop.hbase.client.NonceGenerator;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
|
@ -3052,7 +3053,25 @@ public class HRegionServer extends HasThread implements
|
||||||
if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
|
if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
|
||||||
throw new IOException(exceptionToThrow);
|
throw new IOException(exceptionToThrow);
|
||||||
}
|
}
|
||||||
|
if (parentRegion.getTableDesc().hasSerialReplicationScope()) {
|
||||||
|
// For serial replication, we need add a final barrier on this region. But the splitting may
|
||||||
|
// be reverted, so we should make sure if we reopen this region, the open barrier is same as
|
||||||
|
// this final barrier
|
||||||
|
long seq = parentRegion.getMaxFlushedSeqId();
|
||||||
|
if (seq == HConstants.NO_SEQNUM) {
|
||||||
|
// No edits in WAL for this region; get the sequence number when the region was opened.
|
||||||
|
seq = parentRegion.getOpenSeqNum();
|
||||||
|
if (seq == HConstants.NO_SEQNUM) {
|
||||||
|
// This region has no data
|
||||||
|
seq = 0;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
seq++;
|
||||||
|
}
|
||||||
|
Put finalBarrier = MetaTableAccessor.makeBarrierPut(Bytes.toBytes(parentRegionEncodedName),
|
||||||
|
seq, parentRegion.getTableDesc().getTableName().getName());
|
||||||
|
MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier);
|
||||||
|
}
|
||||||
// Offline the region
|
// Offline the region
|
||||||
this.removeFromOnlineRegions(parentRegion, null);
|
this.removeFromOnlineRegions(parentRegion, null);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import com.google.common.cache.Cache;
|
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
|
@ -33,10 +32,8 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
@ -58,9 +55,6 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
|
@ -70,6 +64,9 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
|
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
|
||||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
|
@ -860,8 +860,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
* It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
|
* It is mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the
|
||||||
* order of logs that is written before altering.
|
* order of logs that is written before altering.
|
||||||
* 3) This entry is in the first interval of barriers. We can push them because it is the
|
* 3) This entry is in the first interval of barriers. We can push them because it is the
|
||||||
* start of a region. Splitting/merging regions are also ok because the first section of
|
* start of a region. But if the region is created by region split, we should check
|
||||||
* daughter region is in same region of parents and the order in one RS is guaranteed.
|
* if the parent regions are fully pushed.
|
||||||
* 4) If the entry's seq id and the position are in same section, or the pos is the last
|
* 4) If the entry's seq id and the position are in same section, or the pos is the last
|
||||||
* number of previous section. Because when open a region we put a barrier the number
|
* number of previous section. Because when open a region we put a barrier the number
|
||||||
* is the last log's id + 1.
|
* is the last log's id + 1.
|
||||||
|
@ -879,7 +879,40 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
}
|
}
|
||||||
if (interval == 1) {
|
if (interval == 1) {
|
||||||
// Case 3
|
// Case 3
|
||||||
return;
|
// Check if there are parent regions
|
||||||
|
String parentValue = MetaTableAccessor.getSerialReplicationParentRegion(connection,
|
||||||
|
encodedName);
|
||||||
|
if (parentValue == null) {
|
||||||
|
// This region has no parent or the parent's log entries are fully pushed.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
while (true) {
|
||||||
|
boolean allParentDone = true;
|
||||||
|
String[] parentRegions = parentValue.split(",");
|
||||||
|
for (String parent : parentRegions) {
|
||||||
|
byte[] region = Bytes.toBytes(parent);
|
||||||
|
long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, region, peerId);
|
||||||
|
List<Long> parentBarriers = MetaTableAccessor.getReplicationBarriers(connection, region);
|
||||||
|
if (parentBarriers.size() > 0
|
||||||
|
&& parentBarriers.get(parentBarriers.size() - 1) - 1 > pos) {
|
||||||
|
allParentDone = false;
|
||||||
|
// For a closed region, we will write a close event marker to WAL whose sequence id is
|
||||||
|
// larger than final barrier but still smaller than next region's openSeqNum.
|
||||||
|
// So if the pos is larger than last barrier, we can say we have read the event marker
|
||||||
|
// which means the parent region has been fully pushed.
|
||||||
|
LOG.info(Bytes.toString(encodedName) + " can not start pushing because parent region's"
|
||||||
|
+ " log has not been fully pushed: parent=" + Bytes.toString(region) + " pos=" + pos
|
||||||
|
+ " barriers=" + Arrays.toString(barriers.toArray()));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (allParentDone) {
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
Thread.sleep(replicationWaitTime);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
|
@ -168,6 +168,18 @@ public class FSTableDescriptors implements TableDescriptors {
|
||||||
// Enable cache of data blocks in L1 if more than one caching tier deployed:
|
// Enable cache of data blocks in L1 if more than one caching tier deployed:
|
||||||
// e.g. if using CombinedBlockCache (BucketCache).
|
// e.g. if using CombinedBlockCache (BucketCache).
|
||||||
.setCacheDataInL1(true),
|
.setCacheDataInL1(true),
|
||||||
|
new HColumnDescriptor(HConstants.REPLICATION_META_FAMILY)
|
||||||
|
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
|
||||||
|
HConstants.DEFAULT_HBASE_META_VERSIONS))
|
||||||
|
.setInMemory(true)
|
||||||
|
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
|
||||||
|
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
|
||||||
|
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||||
|
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
|
||||||
|
.setBloomFilterType(BloomType.NONE)
|
||||||
|
// Enable cache of data blocks in L1 if more than one caching tier deployed:
|
||||||
|
// e.g. if using CombinedBlockCache (BucketCache).
|
||||||
|
.setCacheDataInL1(true),
|
||||||
new HColumnDescriptor(HConstants.TABLE_FAMILY)
|
new HColumnDescriptor(HConstants.TABLE_FAMILY)
|
||||||
// Ten is arbitrary number. Keep versions to help debugging.
|
// Ten is arbitrary number. Keep versions to help debugging.
|
||||||
.setMaxVersions(10)
|
.setMaxVersions(10)
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -112,7 +113,7 @@ public class TestSerialReplication {
|
||||||
rpc.setClusterKey(utility2.getClusterKey());
|
rpc.setClusterKey(utility2.getClusterKey());
|
||||||
admin1.addPeer("1", rpc, null);
|
admin1.addPeer("1", rpc, null);
|
||||||
|
|
||||||
utility1.startMiniCluster(1, 3);
|
utility1.startMiniCluster(1, 10);
|
||||||
utility2.startMiniCluster(1, 1);
|
utility2.startMiniCluster(1, 1);
|
||||||
|
|
||||||
utility1.getHBaseAdmin().setBalancerRunning(false, true);
|
utility1.getHBaseAdmin().setBalancerRunning(false, true);
|
||||||
|
@ -174,16 +175,10 @@ public class TestSerialReplication {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw new Exception("Not all logs have been pushed");
|
throw new Exception("Not all logs have been pushed");
|
||||||
} finally {
|
|
||||||
utility1.getHBaseAdmin().disableTable(tableName);
|
|
||||||
utility2.getHBaseAdmin().disableTable(tableName);
|
|
||||||
utility1.getHBaseAdmin().deleteTable(tableName);
|
|
||||||
utility2.getHBaseAdmin().deleteTable(tableName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore
|
|
||||||
public void testRegionSplit() throws Exception {
|
public void testRegionSplit() throws Exception {
|
||||||
TableName tableName = TableName.valueOf("testRegionSplit");
|
TableName tableName = TableName.valueOf("testRegionSplit");
|
||||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||||
|
@ -201,18 +196,12 @@ public class TestSerialReplication {
|
||||||
t1.put(put);
|
t1.put(put);
|
||||||
}
|
}
|
||||||
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
|
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
|
||||||
Thread.sleep(5000L);
|
waitTableHasRightNumberOfRegions(tableName, 2);
|
||||||
for (int i = 11; i < 100; i += 10) {
|
for (int i = 11; i < 100; i += 10) {
|
||||||
Put put = new Put(ROWS[i]);
|
Put put = new Put(ROWS[i]);
|
||||||
put.addColumn(famName, VALUE, VALUE);
|
put.addColumn(famName, VALUE, VALUE);
|
||||||
t1.put(put);
|
t1.put(put);
|
||||||
}
|
}
|
||||||
balanceTwoRegions(t1);
|
|
||||||
for (int i = 12; i < 100; i += 10) {
|
|
||||||
Put put = new Put(ROWS[i]);
|
|
||||||
put.addColumn(famName, VALUE, VALUE);
|
|
||||||
t1.put(put);
|
|
||||||
}
|
|
||||||
|
|
||||||
long start = EnvironmentEdgeManager.currentTime();
|
long start = EnvironmentEdgeManager.currentTime();
|
||||||
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
|
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
|
||||||
|
@ -226,54 +215,40 @@ public class TestSerialReplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
List<Integer> listOfNumbers = getRowNumbers(list);
|
List<Integer> listOfNumbers = getRowNumbers(list);
|
||||||
List<Integer> list0 = new ArrayList<>();
|
|
||||||
List<Integer> list1 = new ArrayList<>();
|
List<Integer> list1 = new ArrayList<>();
|
||||||
List<Integer> list21 = new ArrayList<>();
|
List<Integer> list21 = new ArrayList<>();
|
||||||
List<Integer> list22 = new ArrayList<>();
|
List<Integer> list22 = new ArrayList<>();
|
||||||
for (int num : listOfNumbers) {
|
for (int num : listOfNumbers) {
|
||||||
if (num % 10 == 0) {
|
if (num % 10 == 0) {
|
||||||
list0.add(num);
|
|
||||||
} else if (num % 10 == 1) {
|
|
||||||
list1.add(num);
|
list1.add(num);
|
||||||
} else if (num < 50) { //num%10==2
|
}else if (num < 50) { //num%10==1
|
||||||
list21.add(num);
|
list21.add(num);
|
||||||
} else { // num%10==1&&num>50
|
} else { // num%10==1&&num>50
|
||||||
list22.add(num);
|
list22.add(num);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(Arrays.toString(list0.toArray()));
|
|
||||||
LOG.info(Arrays.toString(list1.toArray()));
|
LOG.info(Arrays.toString(list1.toArray()));
|
||||||
LOG.info(Arrays.toString(list21.toArray()));
|
LOG.info(Arrays.toString(list21.toArray()));
|
||||||
LOG.info(Arrays.toString(list22.toArray()));
|
LOG.info(Arrays.toString(list22.toArray()));
|
||||||
assertIntegerList(list0, 10, 10);
|
assertIntegerList(list1, 10, 10);
|
||||||
assertIntegerList(list1, 11, 10);
|
assertIntegerList(list21, 11, 10);
|
||||||
assertIntegerList(list21, 12, 10);
|
assertIntegerList(list22, 51, 10);
|
||||||
assertIntegerList(list22, 52, 10);
|
|
||||||
if (!list1.isEmpty()) {
|
|
||||||
assertEquals(9, list0.size());
|
|
||||||
}
|
|
||||||
if (!list21.isEmpty() || !list22.isEmpty()) {
|
if (!list21.isEmpty() || !list22.isEmpty()) {
|
||||||
assertEquals(9, list1.size());
|
assertEquals(9, list1.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (list.size() == 27) {
|
if (list.size() == 18) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
|
LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
|
||||||
Thread.sleep(200);
|
Thread.sleep(200);
|
||||||
}
|
}
|
||||||
throw new Exception("Not all logs have been pushed");
|
throw new Exception("Not all logs have been pushed");
|
||||||
} finally {
|
|
||||||
utility1.getHBaseAdmin().disableTable(tableName);
|
|
||||||
utility2.getHBaseAdmin().disableTable(tableName);
|
|
||||||
utility1.getHBaseAdmin().deleteTable(tableName);
|
|
||||||
utility2.getHBaseAdmin().deleteTable(tableName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore
|
|
||||||
public void testRegionMerge() throws Exception {
|
public void testRegionMerge() throws Exception {
|
||||||
TableName tableName = TableName.valueOf("testRegionMerge");
|
TableName tableName = TableName.valueOf("testRegionMerge");
|
||||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||||
|
@ -282,8 +257,9 @@ public class TestSerialReplication {
|
||||||
table.addFamily(fam);
|
table.addFamily(fam);
|
||||||
utility1.getHBaseAdmin().createTable(table);
|
utility1.getHBaseAdmin().createTable(table);
|
||||||
utility2.getHBaseAdmin().createTable(table);
|
utility2.getHBaseAdmin().createTable(table);
|
||||||
|
Threads.sleep(5000);
|
||||||
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
|
utility1.getHBaseAdmin().split(tableName, ROWS[50]);
|
||||||
Thread.sleep(5000L);
|
waitTableHasRightNumberOfRegions(tableName, 2);
|
||||||
|
|
||||||
try(Table t1 = utility1.getConnection().getTable(tableName);
|
try(Table t1 = utility1.getConnection().getTable(tableName);
|
||||||
Table t2 = utility2.getConnection().getTable(tableName)) {
|
Table t2 = utility2.getConnection().getTable(tableName)) {
|
||||||
|
@ -294,9 +270,9 @@ public class TestSerialReplication {
|
||||||
}
|
}
|
||||||
List<Pair<HRegionInfo, ServerName>> regions =
|
List<Pair<HRegionInfo, ServerName>> regions =
|
||||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
|
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
|
||||||
assertEquals(2, regions.size());
|
|
||||||
utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(),
|
utility1.getHBaseAdmin().mergeRegions(regions.get(0).getFirst().getRegionName(),
|
||||||
regions.get(1).getFirst().getRegionName(), true);
|
regions.get(1).getFirst().getRegionName(), true);
|
||||||
|
waitTableHasRightNumberOfRegions(tableName, 1);
|
||||||
for (int i = 11; i < 100; i += 10) {
|
for (int i = 11; i < 100; i += 10) {
|
||||||
Put put = new Put(ROWS[i]);
|
Put put = new Put(ROWS[i]);
|
||||||
put.addColumn(famName, VALUE, VALUE);
|
put.addColumn(famName, VALUE, VALUE);
|
||||||
|
@ -326,7 +302,6 @@ public class TestSerialReplication {
|
||||||
}
|
}
|
||||||
LOG.info(Arrays.toString(list0.toArray()));
|
LOG.info(Arrays.toString(list0.toArray()));
|
||||||
LOG.info(Arrays.toString(list1.toArray()));
|
LOG.info(Arrays.toString(list1.toArray()));
|
||||||
assertIntegerList(list0, 10, 10);
|
|
||||||
assertIntegerList(list1, 11, 10);
|
assertIntegerList(list1, 11, 10);
|
||||||
if (!list1.isEmpty()) {
|
if (!list1.isEmpty()) {
|
||||||
assertEquals(9, list0.size());
|
assertEquals(9, list0.size());
|
||||||
|
@ -338,11 +313,6 @@ public class TestSerialReplication {
|
||||||
Thread.sleep(200);
|
Thread.sleep(200);
|
||||||
}
|
}
|
||||||
|
|
||||||
} finally {
|
|
||||||
utility1.getHBaseAdmin().disableTable(tableName);
|
|
||||||
utility2.getHBaseAdmin().disableTable(tableName);
|
|
||||||
utility1.getHBaseAdmin().deleteTable(tableName);
|
|
||||||
utility2.getHBaseAdmin().deleteTable(tableName);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -370,10 +340,13 @@ public class TestSerialReplication {
|
||||||
ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
|
ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
|
||||||
utility1.getAdmin()
|
utility1.getAdmin()
|
||||||
.move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
|
.move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
|
||||||
try {
|
while (true) {
|
||||||
Thread.sleep(5000L); // wait to complete
|
regions =
|
||||||
} catch (InterruptedException e) {
|
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
|
||||||
Thread.currentThread().interrupt();
|
if (regions.get(0).getSecond().equals(name)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Threads.sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,16 +360,34 @@ public class TestSerialReplication {
|
||||||
ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
|
ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
|
||||||
utility1.getAdmin()
|
utility1.getAdmin()
|
||||||
.move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
|
.move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
|
||||||
Thread.sleep(5000L);
|
|
||||||
utility1.getAdmin()
|
utility1.getAdmin()
|
||||||
.move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
|
.move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
|
||||||
Thread.sleep(5000L);
|
while (true) {
|
||||||
|
regions =
|
||||||
|
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
|
||||||
|
if (regions.get(0).getSecond().equals(name1) && regions.get(1).getSecond().equals(name2)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Threads.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitTableHasRightNumberOfRegions(TableName tableName, int num) throws IOException {
|
||||||
|
while (true) {
|
||||||
|
List<Pair<HRegionInfo, ServerName>> regions =
|
||||||
|
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
|
||||||
|
if (regions.size() == num) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Threads.sleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertIntegerList(List<Integer> list, int start, int step) {
|
private void assertIntegerList(List<Integer> list, int start, int step) {
|
||||||
int size = list.size();
|
int size = list.size();
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
assertTrue(list.get(i) == start + step * i);
|
assertEquals(start + step * i, list.get(i).intValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue