HBASE-20048 Revert serial replication feature
This commit is contained in:
parent
1bc996aa50
commit
ad5cd50dfc
|
@ -537,14 +537,6 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
|
|||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if there are at least one cf whose replication scope is serial.
|
||||
*/
|
||||
@Override
|
||||
public boolean hasSerialReplicationScope() {
|
||||
return delegatee.hasSerialReplicationScope();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured replicas per region
|
||||
*/
|
||||
|
|
|
@ -17,13 +17,14 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -33,7 +34,6 @@ import java.util.SortedMap;
|
|||
import java.util.TreeMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell.Type;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
|
@ -71,9 +71,8 @@ import org.apache.hadoop.hbase.util.PairOfSameType;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||
|
||||
/**
|
||||
* Read/write operations on region and assignment information store in
|
||||
|
@ -123,34 +122,14 @@ public class MetaTableAccessor {
|
|||
* region is the result of a merge
|
||||
* info:mergeB => contains a serialized HRI for the second parent region if the
|
||||
* region is the result of a merge
|
||||
*
|
||||
* The actual layout of meta should be encapsulated inside MetaTableAccessor methods,
|
||||
* and should not leak out of it (through Result objects, etc)
|
||||
*
|
||||
* For replication serially, there are three column families "rep_barrier", "rep_position" and
|
||||
* "rep_meta" whose row key is encodedRegionName.
|
||||
* rep_barrier:{seqid} => in each time a RS opens a region, it saves the open sequence
|
||||
* id in this region
|
||||
* rep_position:{peerid} => to save the max sequence id we have pushed for each peer
|
||||
* rep_meta:_TABLENAME_ => a special cell to save this region's table name, will used when
|
||||
* we clean old data
|
||||
* rep_meta:_DAUGHTER_ => a special cell to present this region is split or merged, in this
|
||||
* cell the value is merged encoded name or two split encoded names
|
||||
* separated by ","
|
||||
* rep_meta:_PARENT_ => a special cell to present this region's parent region(s), in this
|
||||
* cell the value is encoded name of one or two parent regions
|
||||
* separated by ","
|
||||
*/
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class);
|
||||
private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META");
|
||||
|
||||
// Save its daughter/parent region(s) when split/merge
|
||||
private static final byte[] daughterNameCq = Bytes.toBytes("_DAUGHTER_");
|
||||
private static final byte[] parentNameCq = Bytes.toBytes("_PARENT_");
|
||||
|
||||
// Save its table name because we only know region's encoded name
|
||||
private static final byte[] tableNameCq = Bytes.toBytes("_TABLENAME_");
|
||||
|
||||
static final byte [] META_REGION_PREFIX;
|
||||
static {
|
||||
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
|
||||
|
@ -1352,56 +1331,6 @@ public class MetaTableAccessor {
|
|||
return delete;
|
||||
}
|
||||
|
||||
public static Put makeBarrierPut(byte[] encodedRegionName, long seq, byte[] tableName)
|
||||
throws IOException {
|
||||
byte[] seqBytes = Bytes.toBytes(seq);
|
||||
Put put = new Put(encodedRegionName);
|
||||
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
|
||||
.setRow(put.getRow())
|
||||
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
|
||||
.setQualifier(seqBytes)
|
||||
.setTimestamp(put.getTimeStamp())
|
||||
.setType(Type.Put)
|
||||
.setValue(seqBytes)
|
||||
.build())
|
||||
.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
|
||||
.setRow(put.getRow())
|
||||
.setFamily(HConstants.REPLICATION_META_FAMILY)
|
||||
.setQualifier(tableNameCq)
|
||||
.setTimestamp(put.getTimeStamp())
|
||||
.setType(Cell.Type.Put)
|
||||
.setValue(tableName)
|
||||
.build());
|
||||
return put;
|
||||
}
|
||||
|
||||
|
||||
public static Put makeDaughterPut(byte[] encodedRegionName, byte[] value) throws IOException {
|
||||
Put put = new Put(encodedRegionName);
|
||||
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
|
||||
.setRow(put.getRow())
|
||||
.setFamily(HConstants.REPLICATION_META_FAMILY)
|
||||
.setQualifier(daughterNameCq)
|
||||
.setTimestamp(put.getTimeStamp())
|
||||
.setType(Type.Put)
|
||||
.setValue(value)
|
||||
.build());
|
||||
return put;
|
||||
}
|
||||
|
||||
public static Put makeParentPut(byte[] encodedRegionName, byte[] value) throws IOException {
|
||||
Put put = new Put(encodedRegionName);
|
||||
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
|
||||
.setRow(put.getRow())
|
||||
.setFamily(HConstants.REPLICATION_META_FAMILY)
|
||||
.setQualifier(parentNameCq)
|
||||
.setTimestamp(put.getTimeStamp())
|
||||
.setType(Type.Put)
|
||||
.setValue(value)
|
||||
.build());
|
||||
return put;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds split daughters to the Put
|
||||
*/
|
||||
|
@ -1431,26 +1360,25 @@ public class MetaTableAccessor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Put the passed <code>puts</code> to the <code>hbase:meta</code> table.
|
||||
* Non-atomic for multi puts.
|
||||
* Put the passed <code>p</code> to the <code>hbase:meta</code> table.
|
||||
* @param connection connection we're using
|
||||
* @param puts Put to add to hbase:meta
|
||||
* @param p Put to add to hbase:meta
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void putToMetaTable(final Connection connection, final Put... puts)
|
||||
static void putToMetaTable(final Connection connection, final Put p)
|
||||
throws IOException {
|
||||
put(getMetaHTable(connection), Arrays.asList(puts));
|
||||
put(getMetaHTable(connection), p);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param t Table to use (will be closed when done).
|
||||
* @param puts puts to make
|
||||
* @param p put to make
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void put(final Table t, final List<Put> puts) throws IOException {
|
||||
private static void put(final Table t, final Put p) throws IOException {
|
||||
try {
|
||||
debugLogMutations(puts);
|
||||
t.put(puts);
|
||||
debugLogMutation(p);
|
||||
t.put(p);
|
||||
} finally {
|
||||
t.close();
|
||||
}
|
||||
|
@ -1567,7 +1495,7 @@ public class MetaTableAccessor {
|
|||
* Adds daughter region infos to hbase:meta row for the specified region. Note that this does not
|
||||
* add its daughter's as different rows, but adds information about the daughters in the same row
|
||||
* as the parent. Use
|
||||
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int,boolean)}
|
||||
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int)}
|
||||
* if you want to do that.
|
||||
* @param connection connection we're using
|
||||
* @param regionInfo RegionInfo of parent region
|
||||
|
@ -1575,7 +1503,7 @@ public class MetaTableAccessor {
|
|||
* @param splitB second split daughter of the parent regionInfo
|
||||
* @throws IOException if problem connecting or updating meta
|
||||
*/
|
||||
public static void addSpiltsToParent(Connection connection, RegionInfo regionInfo,
|
||||
public static void addSplitsToParent(Connection connection, RegionInfo regionInfo,
|
||||
RegionInfo splitA, RegionInfo splitB) throws IOException {
|
||||
Table meta = getMetaHTable(connection);
|
||||
try {
|
||||
|
@ -1590,7 +1518,11 @@ public class MetaTableAccessor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Adds a hbase:meta row for the specified new region. Initial state of new region is CLOSED.
|
||||
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
|
||||
* does not add its daughter's as different rows, but adds information about the daughters
|
||||
* in the same row as the parent. Use
|
||||
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName, int)}
|
||||
* if you want to do that.
|
||||
* @param connection connection we're using
|
||||
* @param regionInfo region information
|
||||
* @throws IOException if problem connecting or updating meta
|
||||
|
@ -1651,12 +1583,11 @@ public class MetaTableAccessor {
|
|||
* @param regionB
|
||||
* @param sn the location of the region
|
||||
* @param masterSystemTime
|
||||
* @param saveBarrier true if need save replication barrier in meta, used for serial replication
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void mergeRegions(final Connection connection, RegionInfo mergedRegion,
|
||||
RegionInfo regionA, RegionInfo regionB, ServerName sn, int regionReplication,
|
||||
long masterSystemTime, boolean saveBarrier)
|
||||
long masterSystemTime)
|
||||
throws IOException {
|
||||
Table meta = getMetaHTable(connection);
|
||||
try {
|
||||
|
@ -1707,20 +1638,7 @@ public class MetaTableAccessor {
|
|||
|
||||
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
|
||||
+ HConstants.DELIMITER);
|
||||
Mutation[] mutations;
|
||||
if (saveBarrier) {
|
||||
Put putBarrierA = makeDaughterPut(regionA.getEncodedNameAsBytes(),
|
||||
mergedRegion.getEncodedNameAsBytes());
|
||||
Put putBarrierB = makeDaughterPut(regionB.getEncodedNameAsBytes(),
|
||||
mergedRegion.getEncodedNameAsBytes());
|
||||
Put putDaughter = makeParentPut(mergedRegion.getEncodedNameAsBytes(), Bytes.toBytes(
|
||||
regionA.getEncodedName() + "," + regionB.getEncodedName()));
|
||||
mutations = new Mutation[] { putOfMerged, deleteA, deleteB,
|
||||
putBarrierA, putBarrierB, putDaughter};
|
||||
} else {
|
||||
mutations = new Mutation[] { putOfMerged, deleteA, deleteB };
|
||||
}
|
||||
multiMutate(connection, meta, tableRow, mutations);
|
||||
multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB);
|
||||
} finally {
|
||||
meta.close();
|
||||
}
|
||||
|
@ -1736,11 +1654,9 @@ public class MetaTableAccessor {
|
|||
* @param splitA Split daughter region A
|
||||
* @param splitB Split daughter region A
|
||||
* @param sn the location of the region
|
||||
* @param saveBarrier true if need save replication barrier in meta, used for serial replication
|
||||
*/
|
||||
public static void splitRegion(final Connection connection, RegionInfo parent,
|
||||
RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication,
|
||||
boolean saveBarrier) throws IOException {
|
||||
public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA,
|
||||
RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
|
||||
Table meta = getMetaHTable(connection);
|
||||
try {
|
||||
//Put for parent
|
||||
|
@ -1771,21 +1687,8 @@ public class MetaTableAccessor {
|
|||
addEmptyLocation(putB, i);
|
||||
}
|
||||
|
||||
Mutation[] mutations;
|
||||
if (saveBarrier) {
|
||||
Put parentPut = makeDaughterPut(parent.getEncodedNameAsBytes(),
|
||||
Bytes.toBytes(splitA.getEncodedName() + "," + splitB.getEncodedName()));
|
||||
Put daughterPutA = makeParentPut(splitA.getEncodedNameAsBytes(),
|
||||
parent.getEncodedNameAsBytes());
|
||||
Put daughterPutB = makeParentPut(splitB.getEncodedNameAsBytes(),
|
||||
parent.getEncodedNameAsBytes());
|
||||
|
||||
mutations = new Mutation[]{putParent, putA, putB, parentPut, daughterPutA, daughterPutB};
|
||||
} else {
|
||||
mutations = new Mutation[]{putParent, putA, putB};
|
||||
}
|
||||
byte[] tableRow = Bytes.toBytes(parent.getRegionNameAsString() + HConstants.DELIMITER);
|
||||
multiMutate(connection, meta, tableRow, mutations);
|
||||
multiMutate(connection, meta, tableRow, putParent, putA, putB);
|
||||
} finally {
|
||||
meta.close();
|
||||
}
|
||||
|
@ -1919,32 +1822,6 @@ public class MetaTableAccessor {
|
|||
updateLocation(connection, regionInfo, sn, openSeqNum, masterSystemTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the progress of pushing entries to peer cluster. Skip entry if value is -1.
|
||||
* @param connection connection we're using
|
||||
* @param peerId the peerId to push
|
||||
* @param positions map that saving positions for each region
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void updateReplicationPositions(Connection connection, String peerId,
|
||||
Map<String, Long> positions) throws IOException {
|
||||
List<Put> puts = new ArrayList<>(positions.entrySet().size());
|
||||
for (Map.Entry<String, Long> entry : positions.entrySet()) {
|
||||
Put put = new Put(Bytes.toBytes(entry.getKey()));
|
||||
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
|
||||
.setRow(put.getRow())
|
||||
.setFamily(HConstants.REPLICATION_POSITION_FAMILY)
|
||||
.setQualifier(Bytes.toBytes(peerId))
|
||||
.setTimestamp(put.getTimeStamp())
|
||||
.setType(Cell.Type.Put)
|
||||
.setValue(Bytes.toBytes(Math.abs(entry.getValue())))
|
||||
.build());
|
||||
puts.add(put);
|
||||
}
|
||||
getMetaHTable(connection).put(puts);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Updates the location of the specified region to be the specified server.
|
||||
* <p>
|
||||
|
@ -2163,129 +2040,4 @@ public class MetaTableAccessor {
|
|||
.setValue(Bytes.toBytes(openSeqNum))
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get replication position for a peer in a region.
|
||||
* @param connection connection we're using
|
||||
* @return the position of this peer, -1 if no position in meta.
|
||||
*/
|
||||
public static long getReplicationPositionForOnePeer(Connection connection,
|
||||
byte[] encodedRegionName, String peerId) throws IOException {
|
||||
Get get = new Get(encodedRegionName);
|
||||
get.addColumn(HConstants.REPLICATION_POSITION_FAMILY, Bytes.toBytes(peerId));
|
||||
Result r = get(getMetaHTable(connection), get);
|
||||
if (r.isEmpty()) {
|
||||
return -1;
|
||||
}
|
||||
Cell cell = r.rawCells()[0];
|
||||
return Bytes.toLong(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get replication positions for all peers in a region.
|
||||
* @param connection connection we're using
|
||||
* @param encodedRegionName region's encoded name
|
||||
* @return the map of positions for each peer
|
||||
*/
|
||||
public static Map<String, Long> getReplicationPositionForAllPeer(Connection connection,
|
||||
byte[] encodedRegionName) throws IOException {
|
||||
Get get = new Get(encodedRegionName);
|
||||
get.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
|
||||
Result r = get(getMetaHTable(connection), get);
|
||||
Map<String, Long> map = new HashMap<>((int) (r.size() / 0.75 + 1));
|
||||
for (Cell c : r.listCells()) {
|
||||
map.put(
|
||||
Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()),
|
||||
Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()));
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get replication barriers for all peers in a region.
|
||||
* @param encodedRegionName region's encoded name
|
||||
* @return a list of barrier sequence numbers.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static List<Long> getReplicationBarriers(Connection connection, byte[] encodedRegionName)
|
||||
throws IOException {
|
||||
Get get = new Get(encodedRegionName);
|
||||
get.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||
Result r = get(getMetaHTable(connection), get);
|
||||
List<Long> list = new ArrayList<>();
|
||||
if (!r.isEmpty()) {
|
||||
for (Cell cell : r.rawCells()) {
|
||||
list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||
cell.getQualifierLength()));
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all barriers in all regions.
|
||||
* @return a map of barrier lists in all regions
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Map<String, List<Long>> getAllBarriers(Connection connection) throws IOException {
|
||||
Map<String, List<Long>> map = new HashMap<>();
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||
try (Table t = getMetaHTable(connection);
|
||||
ResultScanner scanner = t.getScanner(scan)) {
|
||||
Result result;
|
||||
while ((result = scanner.next()) != null) {
|
||||
String key = Bytes.toString(result.getRow());
|
||||
List<Long> list = new ArrayList<>(result.rawCells().length);
|
||||
for (Cell cell : result.rawCells()) {
|
||||
list.add(Bytes.toLong(cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||
cell.getQualifierLength()));
|
||||
}
|
||||
map.put(key, list);
|
||||
}
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
private static String getSerialReplicationColumnValue(Connection connection,
|
||||
byte[] encodedRegionName, byte[] columnQualifier) throws IOException {
|
||||
Get get = new Get(encodedRegionName);
|
||||
get.addColumn(HConstants.REPLICATION_META_FAMILY, columnQualifier);
|
||||
Result result = get(getMetaHTable(connection), get);
|
||||
if (!result.isEmpty()) {
|
||||
Cell c = result.rawCells()[0];
|
||||
return Bytes.toString(c.getValueArray(), c.getValueOffset(), c.getValueLength());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get daughter region(s) for a region, only used in serial replication.
|
||||
* @param connection connection we're using
|
||||
* @param encodedName region's encoded name
|
||||
*/
|
||||
public static String getSerialReplicationDaughterRegion(Connection connection, byte[] encodedName)
|
||||
throws IOException {
|
||||
return getSerialReplicationColumnValue(connection, encodedName, daughterNameCq);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get parent region(s) for a region, only used in serial replication.
|
||||
* @param connection connection we're using
|
||||
* @param encodedName region's encoded name
|
||||
*/
|
||||
public static String getSerialReplicationParentRegion(Connection connection, byte[] encodedName)
|
||||
throws IOException {
|
||||
return getSerialReplicationColumnValue(connection, encodedName, parentNameCq);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the table name for a region, only used in serial replication.
|
||||
* @param connection connection we're using
|
||||
* @param encodedName region's encoded name
|
||||
*/
|
||||
public static String getSerialReplicationTableName(Connection connection, byte[] encodedName)
|
||||
throws IOException {
|
||||
return getSerialReplicationColumnValue(connection, encodedName, tableNameCq);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -231,12 +231,6 @@ public interface TableDescriptor {
|
|||
*/
|
||||
boolean hasRegionMemStoreReplication();
|
||||
|
||||
/**
|
||||
* @return true if there are at least one cf whose replication scope is
|
||||
* serial.
|
||||
*/
|
||||
boolean hasSerialReplicationScope();
|
||||
|
||||
/**
|
||||
* Check if the compaction enable flag of the table is true. If flag is false
|
||||
* then no minor/major compactions will be done in real.
|
||||
|
@ -285,8 +279,7 @@ public interface TableDescriptor {
|
|||
boolean hasDisabled = false;
|
||||
|
||||
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
|
||||
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
|
||||
&& cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
|
||||
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
|
||||
hasDisabled = true;
|
||||
} else {
|
||||
hasEnabled = true;
|
||||
|
|
|
@ -32,21 +32,20 @@ import java.util.TreeMap;
|
|||
import java.util.TreeSet;
|
||||
import java.util.function.Function;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
|
||||
/**
|
||||
* @since 2.0.0
|
||||
*/
|
||||
|
@ -1054,16 +1053,6 @@ public class TableDescriptorBuilder {
|
|||
return families.values().toArray(new ColumnFamilyDescriptor[families.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if there are at least one cf whose replication scope is
|
||||
* serial.
|
||||
*/
|
||||
@Override
|
||||
public boolean hasSerialReplicationScope() {
|
||||
return Stream.of(getColumnFamilies())
|
||||
.anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured replicas per region
|
||||
*/
|
||||
|
|
|
@ -78,10 +78,8 @@ public class ReplicationAdmin implements Closeable {
|
|||
// only Global for now, can add other type
|
||||
// such as, 1) no global replication, or 2) the table is replicated to this cluster, etc.
|
||||
public static final String REPLICATIONTYPE = "replicationType";
|
||||
public static final String REPLICATIONGLOBAL =
|
||||
Integer.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
public static final String REPLICATIONSERIAL =
|
||||
Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
public static final String REPLICATIONGLOBAL = Integer
|
||||
.toString(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
|
||||
private final Connection connection;
|
||||
private Admin admin;
|
||||
|
@ -356,9 +354,7 @@ public class ReplicationAdmin implements Closeable {
|
|||
HashMap<String, String> replicationEntry = new HashMap<>();
|
||||
replicationEntry.put(TNAME, table);
|
||||
replicationEntry.put(CFNAME, cf);
|
||||
replicationEntry.put(REPLICATIONTYPE,
|
||||
scope == HConstants.REPLICATION_SCOPE_GLOBAL ? REPLICATIONGLOBAL
|
||||
: REPLICATIONSERIAL);
|
||||
replicationEntry.put(REPLICATIONTYPE, REPLICATIONGLOBAL);
|
||||
replicationColFams.add(replicationEntry);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -24,8 +24,9 @@ import static org.junit.Assert.fail;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -341,26 +342,4 @@ public class TestTableDescriptorBuilder {
|
|||
.build();
|
||||
assertEquals(42, htd.getPriority());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerialReplicationScope() {
|
||||
HColumnDescriptor hcdWithScope = new HColumnDescriptor(Bytes.toBytes("cf0"));
|
||||
hcdWithScope.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
HColumnDescriptor hcdWithoutScope = new HColumnDescriptor(Bytes.toBytes("cf1"));
|
||||
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(hcdWithoutScope)
|
||||
.build();
|
||||
assertFalse(htd.hasSerialReplicationScope());
|
||||
|
||||
htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(hcdWithScope)
|
||||
.build();
|
||||
assertTrue(htd.hasSerialReplicationScope());
|
||||
|
||||
htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
|
||||
.addColumnFamily(hcdWithScope)
|
||||
.addColumnFamily(hcdWithoutScope)
|
||||
.build();
|
||||
assertTrue(htd.hasSerialReplicationScope());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -431,27 +431,6 @@ public final class HConstants {
|
|||
/** The catalog family */
|
||||
public static final byte [] CATALOG_FAMILY = Bytes.toBytes(CATALOG_FAMILY_STR);
|
||||
|
||||
/** The replication barrier family as a string*/
|
||||
public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
|
||||
|
||||
/** The replication barrier family */
|
||||
public static final byte [] REPLICATION_BARRIER_FAMILY =
|
||||
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
|
||||
|
||||
/** The replication position family as a string*/
|
||||
public static final String REPLICATION_POSITION_FAMILY_STR = "rep_position";
|
||||
|
||||
/** The replication position family */
|
||||
public static final byte [] REPLICATION_POSITION_FAMILY =
|
||||
Bytes.toBytes(REPLICATION_POSITION_FAMILY_STR);
|
||||
|
||||
/** The replication meta family as a string*/
|
||||
public static final String REPLICATION_META_FAMILY_STR = "rep_meta";
|
||||
|
||||
/** The replication meta family */
|
||||
public static final byte [] REPLICATION_META_FAMILY =
|
||||
Bytes.toBytes(REPLICATION_META_FAMILY_STR);
|
||||
|
||||
/** The RegionInfo qualifier as a string */
|
||||
public static final String REGIONINFO_QUALIFIER_STR = "regioninfo";
|
||||
|
||||
|
@ -661,12 +640,6 @@ public final class HConstants {
|
|||
*/
|
||||
public static final int REPLICATION_SCOPE_GLOBAL = 1;
|
||||
|
||||
/**
|
||||
* Scope tag for serially scoped data
|
||||
* This data will be replicated to all peers by the order of sequence id.
|
||||
*/
|
||||
public static final int REPLICATION_SCOPE_SERIAL = 2;
|
||||
|
||||
/**
|
||||
* Default cluster ID, cannot be used to identify a cluster so a key with
|
||||
* this value means it wasn't meant for replication.
|
||||
|
@ -917,12 +890,6 @@ public final class HConstants {
|
|||
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
|
||||
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
|
||||
public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
|
||||
|
||||
public static final String
|
||||
REPLICATION_SERIALLY_WAITING_KEY = "hbase.serial.replication.waitingMs";
|
||||
public static final long
|
||||
REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
|
||||
|
||||
/**
|
||||
* Max total size of buffered entries in all replication peers. It will prevent server getting
|
||||
* OOM if there are many peers. Default value is 256MB which is four times to default
|
||||
|
|
|
@ -1659,19 +1659,6 @@ possible configurations would overwhelm and obscure the important.
|
|||
default of 10 will rarely need to be changed.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.serial.replication.waitingMs</name>
|
||||
<value>10000</value>
|
||||
<description>
|
||||
By default, in replication we can not make sure the order of operations in slave cluster is
|
||||
same as the order in master. If set REPLICATION_SCOPE to 2, we will push edits by the order
|
||||
of written. This configuration is to set how long (in ms) we will wait before next checking if
|
||||
a log can NOT be pushed because there are some logs written before it that have yet to be
|
||||
pushed. A larger waiting will decrease the number of queries on hbase:meta but will enlarge
|
||||
the delay of replication. This feature relies on zk-less assignment, so users must set
|
||||
hbase.assignment.usezk to false to support it.
|
||||
</description>
|
||||
</property>
|
||||
<!-- Static Web User Filter properties. -->
|
||||
<property>
|
||||
<name>hbase.http.staticuser.user</name>
|
||||
|
|
|
@ -75,7 +75,6 @@ message WALKey {
|
|||
enum ScopeType {
|
||||
REPLICATION_SCOPE_LOCAL = 0;
|
||||
REPLICATION_SCOPE_GLOBAL = 1;
|
||||
REPLICATION_SCOPE_SERIAL = 2;
|
||||
}
|
||||
|
||||
message FamilyScope {
|
||||
|
|
|
@ -109,7 +109,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
|
|||
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
|
||||
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
|
||||
import org.apache.hadoop.hbase.master.locking.LockManager;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
|
||||
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
|
||||
|
@ -125,7 +124,6 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
|||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
|
||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
|
||||
import org.apache.hadoop.hbase.master.procedure.ModifyNamespaceProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
|
||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||
import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
|
||||
|
@ -365,7 +363,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
private ClusterStatusPublisher clusterStatusPublisherChore = null;
|
||||
|
||||
CatalogJanitor catalogJanitorChore;
|
||||
private ReplicationMetaCleaner replicationMetaCleaner;
|
||||
private LogCleaner logCleaner;
|
||||
private HFileCleaner hfileCleaner;
|
||||
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
|
||||
|
@ -1166,8 +1163,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Started service threads");
|
||||
}
|
||||
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
|
||||
getChoreService().scheduleChore(replicationMetaCleaner);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1190,7 +1185,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
// Clean up and close up shop
|
||||
if (this.logCleaner != null) this.logCleaner.cancel(true);
|
||||
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
|
||||
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
|
||||
if (this.quotaManager != null) this.quotaManager.stop();
|
||||
|
||||
if (this.activeMasterManager != null) this.activeMasterManager.stop();
|
||||
|
|
|
@ -199,15 +199,7 @@ public class RegionStateStore {
|
|||
.setValue(Bytes.toBytes(state.name()))
|
||||
.build());
|
||||
LOG.info(info.toString());
|
||||
|
||||
final boolean serialReplication = hasSerialReplicationScope(regionInfo.getTable());
|
||||
if (serialReplication && state == State.OPEN) {
|
||||
Put barrierPut = MetaTableAccessor.makeBarrierPut(regionInfo.getEncodedNameAsBytes(),
|
||||
openSeqNum, regionInfo.getTable().getName());
|
||||
updateRegionLocation(regionInfo, state, put, barrierPut);
|
||||
} else {
|
||||
updateRegionLocation(regionInfo, state, put);
|
||||
}
|
||||
updateRegionLocation(regionInfo, state, put);
|
||||
}
|
||||
|
||||
protected void updateRegionLocation(final RegionInfo regionInfo, final State state,
|
||||
|
@ -238,7 +230,7 @@ public class RegionStateStore {
|
|||
final RegionInfo hriB, final ServerName serverName) throws IOException {
|
||||
final TableDescriptor htd = getTableDescriptor(parent.getTable());
|
||||
MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName,
|
||||
getRegionReplication(htd), hasSerialReplicationScope(htd));
|
||||
getRegionReplication(htd));
|
||||
}
|
||||
|
||||
// ============================================================================================
|
||||
|
@ -248,8 +240,7 @@ public class RegionStateStore {
|
|||
final RegionInfo hriB, final ServerName serverName) throws IOException {
|
||||
final TableDescriptor htd = getTableDescriptor(parent.getTable());
|
||||
MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName,
|
||||
getRegionReplication(htd), EnvironmentEdgeManager.currentTime(),
|
||||
hasSerialReplicationScope(htd));
|
||||
getRegionReplication(htd), EnvironmentEdgeManager.currentTime());
|
||||
}
|
||||
|
||||
// ============================================================================================
|
||||
|
@ -266,14 +257,6 @@ public class RegionStateStore {
|
|||
// ==========================================================================
|
||||
// Table Descriptors helpers
|
||||
// ==========================================================================
|
||||
private boolean hasSerialReplicationScope(final TableName tableName) throws IOException {
|
||||
return hasSerialReplicationScope(getTableDescriptor(tableName));
|
||||
}
|
||||
|
||||
private boolean hasSerialReplicationScope(final TableDescriptor htd) {
|
||||
return (htd != null)? htd.hasSerialReplicationScope(): false;
|
||||
}
|
||||
|
||||
private int getRegionReplication(final TableDescriptor htd) {
|
||||
return (htd != null) ? htd.getRegionReplication() : 1;
|
||||
}
|
||||
|
|
|
@ -1,191 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.cleaner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* This chore is to clean up the useless data in hbase:meta which is used by serial replication.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationMetaCleaner extends ScheduledChore {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationMetaCleaner.class);
|
||||
|
||||
private final Admin admin;
|
||||
private final MasterServices master;
|
||||
|
||||
public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
|
||||
throws IOException {
|
||||
super("ReplicationMetaCleaner", stoppable, period);
|
||||
this.master = master;
|
||||
admin = master.getConnection().getAdmin();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void chore() {
|
||||
try {
|
||||
Map<String, TableDescriptor> tables = master.getTableDescriptors().getAllDescriptors();
|
||||
Map<String, Set<String>> serialTables = new HashMap<>();
|
||||
for (Map.Entry<String, TableDescriptor> entry : tables.entrySet()) {
|
||||
boolean hasSerialScope = false;
|
||||
for (ColumnFamilyDescriptor column : entry.getValue().getColumnFamilies()) {
|
||||
if (column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL) {
|
||||
hasSerialScope = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (hasSerialScope) {
|
||||
serialTables.put(entry.getValue().getTableName().getNameAsString(), new HashSet<>());
|
||||
}
|
||||
}
|
||||
if (serialTables.isEmpty()){
|
||||
return;
|
||||
}
|
||||
|
||||
List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
|
||||
for (ReplicationPeerDescription peerDesc : peers) {
|
||||
Map<TableName, List<String>> tableCFsMap = peerDesc.getPeerConfig().getTableCFsMap();
|
||||
if (tableCFsMap ==null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (Map.Entry<TableName, List<String>> map : tableCFsMap.entrySet()) {
|
||||
if (serialTables.containsKey(map.getKey().getNameAsString())) {
|
||||
serialTables.get(map.getKey().getNameAsString()).add(peerDesc.getPeerId());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, List<Long>> barrierMap = MetaTableAccessor.getAllBarriers(master.getConnection());
|
||||
for (Map.Entry<String, List<Long>> entry : barrierMap.entrySet()) {
|
||||
String encodedName = entry.getKey();
|
||||
byte[] encodedBytes = Bytes.toBytes(encodedName);
|
||||
boolean canClearRegion = false;
|
||||
Map<String, Long> posMap = MetaTableAccessor.getReplicationPositionForAllPeer(
|
||||
master.getConnection(), encodedBytes);
|
||||
if (posMap.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String tableName = MetaTableAccessor.getSerialReplicationTableName(
|
||||
master.getConnection(), encodedBytes);
|
||||
Set<String> confPeers = serialTables.get(tableName);
|
||||
if (confPeers == null) {
|
||||
// This table doesn't exist or all cf's scope is not serial any more, we can clear meta.
|
||||
canClearRegion = true;
|
||||
} else {
|
||||
if (!allPeersHavePosition(confPeers, posMap)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String daughterValue = MetaTableAccessor
|
||||
.getSerialReplicationDaughterRegion(master.getConnection(), encodedBytes);
|
||||
if (daughterValue != null) {
|
||||
//this region is merged or split
|
||||
boolean allDaughterStart = true;
|
||||
String[] daughterRegions = daughterValue.split(",");
|
||||
for (String daughter : daughterRegions) {
|
||||
byte[] region = Bytes.toBytes(daughter);
|
||||
if (!MetaTableAccessor.getReplicationBarriers(
|
||||
master.getConnection(), region).isEmpty() &&
|
||||
!allPeersHavePosition(confPeers,
|
||||
MetaTableAccessor
|
||||
.getReplicationPositionForAllPeer(master.getConnection(), region))) {
|
||||
allDaughterStart = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allDaughterStart) {
|
||||
canClearRegion = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (canClearRegion) {
|
||||
Delete delete = new Delete(encodedBytes);
|
||||
delete.addFamily(HConstants.REPLICATION_POSITION_FAMILY);
|
||||
delete.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
|
||||
delete.addFamily(HConstants.REPLICATION_META_FAMILY);
|
||||
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
||||
metaTable.delete(delete);
|
||||
}
|
||||
} else {
|
||||
|
||||
// Barriers whose seq is larger than min pos of all peers, and the last barrier whose seq
|
||||
// is smaller than min pos should be kept. All other barriers can be deleted.
|
||||
|
||||
long minPos = Long.MAX_VALUE;
|
||||
for (Map.Entry<String, Long> pos : posMap.entrySet()) {
|
||||
minPos = Math.min(minPos, pos.getValue());
|
||||
}
|
||||
List<Long> barriers = entry.getValue();
|
||||
int index = Collections.binarySearch(barriers, minPos);
|
||||
if (index < 0) {
|
||||
index = -index - 1;
|
||||
}
|
||||
Delete delete = new Delete(encodedBytes);
|
||||
for (int i = 0; i < index - 1; i++) {
|
||||
delete.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, Bytes.toBytes(barriers.get(i)));
|
||||
}
|
||||
try (Table metaTable = master.getConnection().getTable(TableName.META_TABLE_NAME)) {
|
||||
metaTable.delete(delete);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception during cleaning up.", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean allPeersHavePosition(Set<String> peers, Map<String, Long> posMap)
|
||||
throws IOException {
|
||||
for(String peer:peers){
|
||||
if (!posMap.containsKey(peer)){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.ZNodeClearer;
|
|||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionUtils;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||
|
@ -3169,54 +3168,34 @@ public class HRegionServer extends HasThread implements
|
|||
* @return true if closed the region successfully.
|
||||
* @throws IOException
|
||||
*/
|
||||
protected boolean closeAndOfflineRegionForSplitOrMerge(
|
||||
final List<String> regionEncodedName) throws IOException {
|
||||
for (int i = 0; i < regionEncodedName.size(); ++i) {
|
||||
HRegion regionToClose = this.getRegion(regionEncodedName.get(i));
|
||||
if (regionToClose != null) {
|
||||
Map<byte[], List<HStoreFile>> hstoreFiles = null;
|
||||
Exception exceptionToThrow = null;
|
||||
try{
|
||||
hstoreFiles = regionToClose.close(false);
|
||||
} catch (Exception e) {
|
||||
exceptionToThrow = e;
|
||||
}
|
||||
if (exceptionToThrow == null && hstoreFiles == null) {
|
||||
// The region was closed by someone else
|
||||
exceptionToThrow =
|
||||
new IOException("Failed to close region: already closed by another thread");
|
||||
}
|
||||
|
||||
if (exceptionToThrow != null) {
|
||||
if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow;
|
||||
throw new IOException(exceptionToThrow);
|
||||
}
|
||||
if (regionToClose.getTableDescriptor().hasSerialReplicationScope()) {
|
||||
// For serial replication, we need add a final barrier on this region. But the splitting
|
||||
// or merging may be reverted, so we should make sure if we reopen this region, the open
|
||||
// barrier is same as this final barrier
|
||||
long seq = regionToClose.getMaxFlushedSeqId();
|
||||
if (seq == HConstants.NO_SEQNUM) {
|
||||
// No edits in WAL for this region; get the sequence number when the region was opened.
|
||||
seq = regionToClose.getOpenSeqNum();
|
||||
if (seq == HConstants.NO_SEQNUM) {
|
||||
// This region has no data
|
||||
seq = 0;
|
||||
}
|
||||
} else {
|
||||
seq++;
|
||||
}
|
||||
Put finalBarrier = MetaTableAccessor.makeBarrierPut(
|
||||
Bytes.toBytes(regionEncodedName.get(i)),
|
||||
seq,
|
||||
regionToClose.getTableDescriptor().getTableName().getName());
|
||||
MetaTableAccessor.putToMetaTable(getConnection(), finalBarrier);
|
||||
}
|
||||
// Offline the region
|
||||
this.removeRegion(regionToClose, null);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
protected boolean closeAndOfflineRegionForSplitOrMerge(final List<String> regionEncodedName)
|
||||
throws IOException {
|
||||
for (int i = 0; i < regionEncodedName.size(); ++i) {
|
||||
HRegion regionToClose = this.getRegion(regionEncodedName.get(i));
|
||||
if (regionToClose != null) {
|
||||
Map<byte[], List<HStoreFile>> hstoreFiles = null;
|
||||
Exception exceptionToThrow = null;
|
||||
try {
|
||||
hstoreFiles = regionToClose.close(false);
|
||||
} catch (Exception e) {
|
||||
exceptionToThrow = e;
|
||||
}
|
||||
if (exceptionToThrow == null && hstoreFiles == null) {
|
||||
// The region was closed by someone else
|
||||
exceptionToThrow =
|
||||
new IOException("Failed to close region: already closed by another thread");
|
||||
}
|
||||
if (exceptionToThrow != null) {
|
||||
if (exceptionToThrow instanceof IOException) {
|
||||
throw (IOException) exceptionToThrow;
|
||||
}
|
||||
throw new IOException(exceptionToThrow);
|
||||
}
|
||||
// Offline the region
|
||||
this.removeRegion(regionToClose, null);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -117,6 +117,7 @@ class FSWALEntry extends Entry {
|
|||
PrivateCellUtil.setSequenceId(c, regionSequenceId);
|
||||
}
|
||||
}
|
||||
|
||||
getKey().setWriteEntry(we);
|
||||
return regionSequenceId;
|
||||
}
|
||||
|
|
|
@ -74,8 +74,7 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
try {
|
||||
WALEntryBatch entryBatch = entryReader.take();
|
||||
shipEdits(entryBatch);
|
||||
if (entryBatch.getWalEntries().isEmpty()
|
||||
&& entryBatch.getLastSeqIds().isEmpty()) {
|
||||
if (entryBatch.getWalEntries().isEmpty()) {
|
||||
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
|
||||
+ source.getQueueId());
|
||||
source.getSourceMetrics().incrCompletedRecoveryQueue();
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -44,12 +43,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationListener;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||
|
@ -58,7 +54,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -152,8 +147,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
private final boolean replicationForBulkLoadDataEnabled;
|
||||
|
||||
private Connection connection;
|
||||
private long replicationWaitTime;
|
||||
|
||||
private AtomicLong totalBufferUsed = new AtomicLong();
|
||||
|
||||
|
@ -206,9 +199,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
this.latestPaths = new HashSet<Path>();
|
||||
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
||||
this.replicationWaitTime = conf.getLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY,
|
||||
HConstants.REPLICATION_SERIALLY_WAITING_DEFAULT);
|
||||
connection = ConnectionFactory.createConnection(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -845,10 +835,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
return this.fs;
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the ReplicationPeers used by this ReplicationSourceManager
|
||||
* @return the ReplicationPeers used by this ReplicationSourceManager
|
||||
|
@ -887,103 +873,4 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
int activeFailoverTaskCount() {
|
||||
return executor.getActiveCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether an entry can be pushed to the peer or not right now. If we enable serial replication,
|
||||
* we can not push the entry until all entries in its region whose sequence numbers are smaller
|
||||
* than this entry have been pushed. For each ReplicationSource, we need only check the first
|
||||
* entry in each region, as long as it can be pushed, we can push all in this ReplicationSource.
|
||||
* This method will be blocked until we can push.
|
||||
* @return the first barrier of entry's region, or -1 if there is no barrier. It is used to
|
||||
* prevent saving positions in the region of no barrier.
|
||||
*/
|
||||
void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId)
|
||||
throws IOException, InterruptedException {
|
||||
/**
|
||||
* There are barriers for this region and position for this peer. N barriers form N intervals,
|
||||
* (b1,b2) (b2,b3) ... (bn,max). Generally, there is no logs whose seq id is not greater than
|
||||
* the first barrier and the last interval is start from the last barrier. There are several
|
||||
* conditions that we can push now, otherwise we should block: 1) "Serial replication" is not
|
||||
* enabled, we can push all logs just like before. This case should not call this method. 2)
|
||||
* There is no barriers for this region, or the seq id is smaller than the first barrier. It is
|
||||
* mainly because we alter REPLICATION_SCOPE = 2. We can not guarantee the order of logs that is
|
||||
* written before altering. 3) This entry is in the first interval of barriers. We can push them
|
||||
* because it is the start of a region. But if the region is created by region split, we should
|
||||
* check if the parent regions are fully pushed. 4) If the entry's seq id and the position are
|
||||
* in same section, or the pos is the last number of previous section. Because when open a
|
||||
* region we put a barrier the number is the last log's id + 1. 5) Log's seq is smaller than pos
|
||||
* in meta, we are retrying. It may happen when a RS crashes after save replication meta and
|
||||
* before save zk offset.
|
||||
*/
|
||||
List<Long> barriers = MetaTableAccessor.getReplicationBarriers(connection, encodedName);
|
||||
if (barriers.isEmpty() || seq <= barriers.get(0)) {
|
||||
// Case 2
|
||||
return;
|
||||
}
|
||||
int interval = Collections.binarySearch(barriers, seq);
|
||||
if (interval < 0) {
|
||||
interval = -interval - 1;// get the insert position if negative
|
||||
}
|
||||
if (interval == 1) {
|
||||
// Case 3
|
||||
// Check if there are parent regions
|
||||
String parentValue =
|
||||
MetaTableAccessor.getSerialReplicationParentRegion(connection, encodedName);
|
||||
if (parentValue == null) {
|
||||
// This region has no parent or the parent's log entries are fully pushed.
|
||||
return;
|
||||
}
|
||||
while (true) {
|
||||
boolean allParentDone = true;
|
||||
String[] parentRegions = parentValue.split(",");
|
||||
for (String parent : parentRegions) {
|
||||
byte[] region = Bytes.toBytes(parent);
|
||||
long pos = MetaTableAccessor.getReplicationPositionForOnePeer(connection, region, peerId);
|
||||
List<Long> parentBarriers = MetaTableAccessor.getReplicationBarriers(connection, region);
|
||||
if (parentBarriers.size() > 0 &&
|
||||
parentBarriers.get(parentBarriers.size() - 1) - 1 > pos) {
|
||||
allParentDone = false;
|
||||
// For a closed region, we will write a close event marker to WAL whose sequence id is
|
||||
// larger than final barrier but still smaller than next region's openSeqNum.
|
||||
// So if the pos is larger than last barrier, we can say we have read the event marker
|
||||
// which means the parent region has been fully pushed.
|
||||
LOG.info(
|
||||
Bytes.toString(encodedName) + " can not start pushing because parent region's" +
|
||||
" log has not been fully pushed: parent=" + Bytes.toString(region) + " pos=" + pos +
|
||||
" barriers=" + Arrays.toString(barriers.toArray()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (allParentDone) {
|
||||
return;
|
||||
} else {
|
||||
Thread.sleep(replicationWaitTime);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
while (true) {
|
||||
long pos =
|
||||
MetaTableAccessor.getReplicationPositionForOnePeer(connection, encodedName, peerId);
|
||||
if (seq <= pos) {
|
||||
// Case 5
|
||||
}
|
||||
if (pos >= 0) {
|
||||
// Case 4
|
||||
int posInterval = Collections.binarySearch(barriers, pos);
|
||||
if (posInterval < 0) {
|
||||
posInterval = -posInterval - 1;// get the insert position if negative
|
||||
}
|
||||
if (posInterval == interval || pos == barriers.get(interval - 1) - 1) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info(Bytes.toString(encodedName) + " can not start pushing to peer " + peerId +
|
||||
" because previous log has not been pushed: sequence=" + seq + " pos=" + pos +
|
||||
" barriers=" + Arrays.toString(barriers.toArray()));
|
||||
Thread.sleep(replicationWaitTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,31 +20,23 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
|
||||
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
|
||||
/**
|
||||
* This thread reads entries from a queue and ships them. Entries are placed onto the queue by
|
||||
|
@ -79,17 +71,6 @@ public class ReplicationSourceShipper extends Thread {
|
|||
// Maximum number of retries before taking bold actions
|
||||
protected final int maxRetriesMultiplier;
|
||||
|
||||
// Use guava cache to set ttl for each key
|
||||
private final LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(1, TimeUnit.DAYS).build(
|
||||
new CacheLoader<String, Boolean>() {
|
||||
@Override
|
||||
public Boolean load(String key) throws Exception {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
public ReplicationSourceShipper(Configuration conf, String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
|
||||
this.conf = conf;
|
||||
|
@ -125,9 +106,6 @@ public class ReplicationSourceShipper extends Thread {
|
|||
|
||||
try {
|
||||
WALEntryBatch entryBatch = entryReader.take();
|
||||
for (Map.Entry<String, Long> entry : entryBatch.getLastSeqIds().entrySet()) {
|
||||
waitingUntilCanPush(entry);
|
||||
}
|
||||
shipEdits(entryBatch);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.trace("Interrupted while waiting for next replication entry batch", e);
|
||||
|
@ -150,8 +128,6 @@ public class ReplicationSourceShipper extends Thread {
|
|||
int sleepMultiplier = 0;
|
||||
if (entries.isEmpty()) {
|
||||
if (lastLoggedPosition != lastReadPosition) {
|
||||
// Save positions to meta table before zk.
|
||||
updateSerialRepPositions(entryBatch.getLastSeqIds());
|
||||
updateLogPosition(lastReadPosition);
|
||||
// if there was nothing to ship and it's not an error
|
||||
// set "ageOfLastShippedOp" to <now> to indicate that we're current
|
||||
|
@ -197,9 +173,6 @@ public class ReplicationSourceShipper extends Thread {
|
|||
for (int i = 0; i < size; i++) {
|
||||
cleanUpHFileRefs(entries.get(i).getEdit());
|
||||
}
|
||||
|
||||
// Save positions to meta table before zk.
|
||||
updateSerialRepPositions(entryBatch.getLastSeqIds());
|
||||
//Log and clean up WAL logs
|
||||
updateLogPosition(lastReadPosition);
|
||||
}
|
||||
|
@ -225,33 +198,6 @@ public class ReplicationSourceShipper extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
|
||||
String key = entry.getKey();
|
||||
long seq = entry.getValue();
|
||||
boolean deleteKey = false;
|
||||
if (seq <= 0) {
|
||||
// There is a REGION_CLOSE marker, we can not continue skipping after this entry.
|
||||
deleteKey = true;
|
||||
seq = -seq;
|
||||
}
|
||||
|
||||
if (!canSkipWaitingSet.getUnchecked(key)) {
|
||||
try {
|
||||
source.getSourceManager().waitUntilCanBePushed(Bytes.toBytes(key), seq, source.getPeerId());
|
||||
} catch (IOException e) {
|
||||
LOG.error("waitUntilCanBePushed fail", e);
|
||||
throw new RuntimeException("waitUntilCanBePushed fail");
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("waitUntilCanBePushed interrupted", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
canSkipWaitingSet.put(key, true);
|
||||
}
|
||||
if (deleteKey) {
|
||||
canSkipWaitingSet.invalidate(key);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
|
||||
String peerId = source.getPeerId();
|
||||
if (peerId.contains("-")) {
|
||||
|
@ -282,16 +228,6 @@ public class ReplicationSourceShipper extends Thread {
|
|||
lastLoggedPosition = lastReadPosition;
|
||||
}
|
||||
|
||||
private void updateSerialRepPositions(Map<String, Long> lastPositionsForSerialScope) {
|
||||
try {
|
||||
MetaTableAccessor.updateReplicationPositions(source.getSourceManager().getConnection(),
|
||||
source.getPeerId(), lastPositionsForSerialScope);
|
||||
} catch (IOException e) {
|
||||
LOG.error("updateReplicationPositions fail", e);
|
||||
throw new RuntimeException("updateReplicationPositions fail");
|
||||
}
|
||||
}
|
||||
|
||||
public void startup(UncaughtExceptionHandler handler) {
|
||||
String name = Thread.currentThread().getName();
|
||||
Threads.setDaemonThreadRunning(this, name + ".replicationSource." + walGroupId + ","
|
||||
|
|
|
@ -21,14 +21,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
|||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -36,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
@ -45,7 +41,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
|
||||
|
||||
|
@ -135,7 +131,7 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
continue;
|
||||
}
|
||||
WALEntryBatch batch = readWALEntries(entryStream);
|
||||
if (batch != null && (!batch.getLastSeqIds().isEmpty() || batch.getNbEntries() > 0)) {
|
||||
if (batch != null && batch.getNbEntries() > 0) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(String.format("Read %s WAL entries eligible for replication",
|
||||
batch.getNbEntries()));
|
||||
|
@ -171,10 +167,6 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
|
||||
}
|
||||
Entry entry = entryStream.next();
|
||||
if (updateSerialReplPos(batch, entry)) {
|
||||
batch.lastWalPosition = entryStream.getPosition();
|
||||
break;
|
||||
}
|
||||
entry = filterEntry(entry);
|
||||
if (entry != null) {
|
||||
WALEdit edit = entry.getEdit();
|
||||
|
@ -246,33 +238,6 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
return filtered;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we should stop reading because we're at REGION_CLOSE
|
||||
*/
|
||||
private boolean updateSerialReplPos(WALEntryBatch batch, Entry entry) throws IOException {
|
||||
if (entry.hasSerialReplicationScope()) {
|
||||
String key = Bytes.toString(entry.getKey().getEncodedRegionName());
|
||||
batch.setLastPosition(key, entry.getKey().getSequenceId());
|
||||
if (!entry.getEdit().getCells().isEmpty()) {
|
||||
WALProtos.RegionEventDescriptor maybeEvent =
|
||||
WALEdit.getRegionEventDescriptor(entry.getEdit().getCells().get(0));
|
||||
if (maybeEvent != null && maybeEvent
|
||||
.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
|
||||
// In serially replication, if we move a region to another RS and move it back, we may
|
||||
// read logs crossing two sections. We should break at REGION_CLOSE and push the first
|
||||
// section first in case of missing the middle section belonging to the other RS.
|
||||
// In a worker thread, if we can push the first log of a region, we can push all logs
|
||||
// in the same region without waiting until we read a close marker because next time
|
||||
// we read logs in this region, it must be a new section and not adjacent with this
|
||||
// region. Mark it negative.
|
||||
batch.setLastPosition(key, -entry.getKey().getSequenceId());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
|
||||
* batch to become available
|
||||
|
@ -407,8 +372,6 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
private int nbHFiles = 0;
|
||||
// heap size of data we need to replicate
|
||||
private long heapSize = 0;
|
||||
// save the last sequenceid for each region if the table has serial-replication scope
|
||||
private Map<String, Long> lastSeqIds = new HashMap<>();
|
||||
|
||||
/**
|
||||
* @param walEntries
|
||||
|
@ -477,13 +440,6 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
return heapSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the last sequenceid for each region if the table has serial-replication scope
|
||||
*/
|
||||
public Map<String, Long> getLastSeqIds() {
|
||||
return lastSeqIds;
|
||||
}
|
||||
|
||||
private void incrementNbRowKeys(int increment) {
|
||||
nbRowKeys += increment;
|
||||
}
|
||||
|
@ -495,9 +451,5 @@ public class ReplicationSourceWALReader extends Thread {
|
|||
private void incrementHeapSize(long increment) {
|
||||
heapSize += increment;
|
||||
}
|
||||
|
||||
private void setLastPosition(String region, Long sequenceId) {
|
||||
getLastSeqIds().put(region, sequenceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -405,7 +405,7 @@ public class RestoreSnapshotHelper {
|
|||
}
|
||||
|
||||
LOG.debug("Update splits parent " + regionInfo.getEncodedName() + " -> " + daughters);
|
||||
MetaTableAccessor.addSpiltsToParent(connection, regionInfo,
|
||||
MetaTableAccessor.addSplitsToParent(connection, regionInfo,
|
||||
regionsByName.get(daughters.getFirst()),
|
||||
regionsByName.get(daughters.getSecond()));
|
||||
}
|
||||
|
|
|
@ -159,36 +159,6 @@ public class FSTableDescriptors implements TableDescriptors {
|
|||
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
|
||||
.setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
|
||||
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
|
||||
HConstants.DEFAULT_HBASE_META_VERSIONS))
|
||||
.setInMemory(true)
|
||||
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
|
||||
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
|
||||
.setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_POSITION_FAMILY)
|
||||
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
|
||||
HConstants.DEFAULT_HBASE_META_VERSIONS))
|
||||
.setInMemory(true)
|
||||
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
|
||||
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
|
||||
.setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.REPLICATION_META_FAMILY)
|
||||
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
|
||||
HConstants.DEFAULT_HBASE_META_VERSIONS))
|
||||
.setInMemory(true)
|
||||
.setBlocksize(conf.getInt(HConstants.HBASE_META_BLOCK_SIZE,
|
||||
HConstants.DEFAULT_HBASE_META_BLOCK_SIZE))
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
|
||||
.setBloomFilterType(BloomType.NONE)
|
||||
.build())
|
||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(HConstants.TABLE_FAMILY)
|
||||
.setMaxVersions(conf.getInt(HConstants.HBASE_META_VERSIONS,
|
||||
HConstants.DEFAULT_HBASE_META_VERSIONS))
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -16,14 +15,12 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
|
||||
|
@ -36,8 +33,6 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
// imports we use from yet-to-be-moved regionsever.wal
|
||||
|
||||
/**
|
||||
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
|
||||
* APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
|
||||
|
@ -276,18 +271,6 @@ public interface WAL extends Closeable, WALFileLengthProvider {
|
|||
key.setCompressionContext(compressionContext);
|
||||
}
|
||||
|
||||
public boolean hasSerialReplicationScope () {
|
||||
if (getKey().getReplicationScopes() == null || getKey().getReplicationScopes().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
for (Map.Entry<byte[], Integer> e:getKey().getReplicationScopes().entrySet()) {
|
||||
if (e.getValue() == HConstants.REPLICATION_SCOPE_SERIAL){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.key + "=" + this.edit;
|
||||
|
|
|
@ -496,7 +496,7 @@ public class TestMetaTableAccessor {
|
|||
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
|
||||
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
|
||||
|
||||
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
|
||||
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
|
||||
|
||||
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
|
||||
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
|
||||
|
@ -541,7 +541,7 @@ public class TestMetaTableAccessor {
|
|||
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
|
||||
|
||||
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3,
|
||||
HConstants.LATEST_TIMESTAMP, false);
|
||||
HConstants.LATEST_TIMESTAMP);
|
||||
|
||||
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
|
||||
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
|
||||
|
@ -691,7 +691,7 @@ public class TestMetaTableAccessor {
|
|||
|
||||
// now merge the regions, effectively deleting the rows for region a and b.
|
||||
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo,
|
||||
regionInfoA, regionInfoB, sn, 1, masterSystemTime, false);
|
||||
regionInfoA, regionInfoB, sn, 1, masterSystemTime);
|
||||
|
||||
result = meta.get(get);
|
||||
serverCell = result.getColumnLatestCell(HConstants.CATALOG_FAMILY,
|
||||
|
@ -782,7 +782,7 @@ public class TestMetaTableAccessor {
|
|||
}
|
||||
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
|
||||
long prevCalls = scheduler.numPriorityCalls;
|
||||
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB,loc.getServerName(),1,false);
|
||||
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
|
||||
|
||||
assertTrue(prevCalls < scheduler.numPriorityCalls);
|
||||
}
|
||||
|
@ -819,7 +819,7 @@ public class TestMetaTableAccessor {
|
|||
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
|
||||
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
|
||||
|
||||
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3, false);
|
||||
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
|
||||
Get get1 = new Get(splitA.getRegionName());
|
||||
Result resultA = meta.get(get1);
|
||||
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,
|
||||
|
|
|
@ -348,7 +348,7 @@ public class TestRegionServerMetrics {
|
|||
TEST_UTIL.getAdmin().flush(tableName);
|
||||
|
||||
metricsRegionServer.getRegionServerWrapper().forceRecompute();
|
||||
assertGauge("storeCount", TABLES_ON_MASTER? 1: 7);
|
||||
assertGauge("storeCount", TABLES_ON_MASTER? 1: 4);
|
||||
assertGauge("storeFileCount", 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,400 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HTestConst;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
public class TestSerialReplication {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSerialReplication.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestSerialReplication.class);
|
||||
|
||||
private static Configuration conf1;
|
||||
private static Configuration conf2;
|
||||
|
||||
private static HBaseTestingUtility utility1;
|
||||
private static HBaseTestingUtility utility2;
|
||||
|
||||
private static final byte[] famName = Bytes.toBytes("f");
|
||||
private static final byte[] VALUE = Bytes.toBytes("v");
|
||||
private static final byte[] ROW = Bytes.toBytes("r");
|
||||
private static final byte[][] ROWS = HTestConst.makeNAscii(ROW, 100);
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
conf1 = HBaseConfiguration.create();
|
||||
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
|
||||
// smaller block size and capacity to trigger more operations
|
||||
// and test them
|
||||
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
|
||||
conf1.setInt("replication.source.size.capacity", 1024);
|
||||
conf1.setLong("replication.source.sleepforretries", 100);
|
||||
conf1.setInt("hbase.regionserver.maxlogs", 10);
|
||||
conf1.setLong("hbase.master.logcleaner.ttl", 10);
|
||||
conf1.setBoolean("dfs.support.append", true);
|
||||
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
|
||||
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
|
||||
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
|
||||
conf1.setLong("replication.source.per.peer.node.bandwidth", 100L);// Each WAL is 120 bytes
|
||||
conf1.setLong("replication.source.size.capacity", 1L);
|
||||
conf1.setLong(HConstants.REPLICATION_SERIALLY_WAITING_KEY, 1000L);
|
||||
|
||||
utility1 = new HBaseTestingUtility(conf1);
|
||||
utility1.startMiniZKCluster();
|
||||
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
|
||||
new ZKWatcher(conf1, "cluster1", null, true);
|
||||
|
||||
conf2 = new Configuration(conf1);
|
||||
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||
|
||||
utility2 = new HBaseTestingUtility(conf2);
|
||||
utility2.setZkCluster(miniZK);
|
||||
new ZKWatcher(conf2, "cluster2", null, true);
|
||||
|
||||
utility1.startMiniCluster(1, 10);
|
||||
utility2.startMiniCluster(1, 1);
|
||||
|
||||
ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
|
||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||
rpc.setClusterKey(utility2.getClusterKey());
|
||||
admin1.addPeer("1", rpc, null);
|
||||
|
||||
utility1.getAdmin().setBalancerRunning(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionMoveAndFailover() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
table.addFamily(fam);
|
||||
utility1.getAdmin().createTable(table);
|
||||
utility2.getAdmin().createTable(table);
|
||||
try(Table t1 = utility1.getConnection().getTable(tableName);
|
||||
Table t2 = utility2.getConnection().getTable(tableName)) {
|
||||
LOG.info("move to 1");
|
||||
moveRegion(t1, 1);
|
||||
LOG.info("move to 0");
|
||||
moveRegion(t1, 0);
|
||||
for (int i = 10; i < 20; i++) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
LOG.info("move to 2");
|
||||
moveRegion(t1, 2);
|
||||
for (int i = 20; i < 30; i++) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
utility1.getHBaseCluster().abortRegionServer(2);
|
||||
for (int i = 30; i < 40; i++) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
|
||||
Scan scan = new Scan();
|
||||
scan.setCaching(100);
|
||||
List<Cell> list = new ArrayList<>();
|
||||
try (ResultScanner results = t2.getScanner(scan)) {
|
||||
for (Result result : results) {
|
||||
assertEquals(1, result.rawCells().length);
|
||||
list.add(result.rawCells()[0]);
|
||||
}
|
||||
}
|
||||
List<Integer> listOfNumbers = getRowNumbers(list);
|
||||
LOG.info(Arrays.toString(listOfNumbers.toArray()));
|
||||
assertIntegerList(listOfNumbers, 10, 1);
|
||||
if (listOfNumbers.size() != 30) {
|
||||
LOG.info("Waiting all logs pushed to slave. Expected 30 , actual " + list.size());
|
||||
Thread.sleep(200);
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
}
|
||||
throw new Exception("Not all logs have been pushed");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionSplit() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
table.addFamily(fam);
|
||||
utility1.getAdmin().createTable(table);
|
||||
utility2.getAdmin().createTable(table);
|
||||
try(Table t1 = utility1.getConnection().getTable(tableName);
|
||||
Table t2 = utility2.getConnection().getTable(tableName)) {
|
||||
|
||||
for (int i = 10; i < 100; i += 10) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
utility1.getAdmin().split(tableName, ROWS[50]);
|
||||
waitTableHasRightNumberOfRegions(tableName, 2);
|
||||
for (int i = 11; i < 100; i += 10) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
|
||||
Scan scan = new Scan();
|
||||
scan.setCaching(100);
|
||||
List<Cell> list = new ArrayList<>();
|
||||
try (ResultScanner results = t2.getScanner(scan)) {
|
||||
for (Result result : results) {
|
||||
assertEquals(1, result.rawCells().length);
|
||||
list.add(result.rawCells()[0]);
|
||||
}
|
||||
}
|
||||
List<Integer> listOfNumbers = getRowNumbers(list);
|
||||
List<Integer> list1 = new ArrayList<>();
|
||||
List<Integer> list21 = new ArrayList<>();
|
||||
List<Integer> list22 = new ArrayList<>();
|
||||
for (int num : listOfNumbers) {
|
||||
if (num % 10 == 0) {
|
||||
list1.add(num);
|
||||
}else if (num < 50) { //num%10==1
|
||||
list21.add(num);
|
||||
} else { // num%10==1&&num>50
|
||||
list22.add(num);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.info(Arrays.toString(list1.toArray()));
|
||||
LOG.info(Arrays.toString(list21.toArray()));
|
||||
LOG.info(Arrays.toString(list22.toArray()));
|
||||
assertIntegerList(list1, 10, 10);
|
||||
assertIntegerList(list21, 11, 10);
|
||||
assertIntegerList(list22, 51, 10);
|
||||
if (!list21.isEmpty() || !list22.isEmpty()) {
|
||||
assertEquals(9, list1.size());
|
||||
}
|
||||
|
||||
if (list.size() == 18) {
|
||||
return;
|
||||
}
|
||||
LOG.info("Waiting all logs pushed to slave. Expected 27 , actual " + list.size());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
throw new Exception("Not all logs have been pushed");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionMerge() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
table.addFamily(fam);
|
||||
utility1.getAdmin().createTable(table);
|
||||
utility2.getAdmin().createTable(table);
|
||||
Threads.sleep(5000);
|
||||
utility1.getAdmin().split(tableName, ROWS[50]);
|
||||
waitTableHasRightNumberOfRegions(tableName, 2);
|
||||
|
||||
try(Table t1 = utility1.getConnection().getTable(tableName);
|
||||
Table t2 = utility2.getConnection().getTable(tableName)) {
|
||||
for (int i = 10; i < 100; i += 10) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
List<Pair<RegionInfo, ServerName>> regions =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
|
||||
utility1.getAdmin().mergeRegionsAsync(regions.get(0).getFirst().getRegionName(),
|
||||
regions.get(1).getFirst().getRegionName(), true);
|
||||
waitTableHasRightNumberOfRegions(tableName, 1);
|
||||
for (int i = 11; i < 100; i += 10) {
|
||||
Put put = new Put(ROWS[i]);
|
||||
put.addColumn(famName, VALUE, VALUE);
|
||||
t1.put(put);
|
||||
}
|
||||
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
while (EnvironmentEdgeManager.currentTime() - start < 180000) {
|
||||
Scan scan = new Scan();
|
||||
scan.setCaching(100);
|
||||
List<Cell> list = new ArrayList<>();
|
||||
try (ResultScanner results = t2.getScanner(scan)) {
|
||||
for (Result result : results) {
|
||||
assertEquals(1, result.rawCells().length);
|
||||
list.add(result.rawCells()[0]);
|
||||
}
|
||||
}
|
||||
List<Integer> listOfNumbers = getRowNumbers(list);
|
||||
List<Integer> list0 = new ArrayList<>();
|
||||
List<Integer> list1 = new ArrayList<>();
|
||||
for (int num : listOfNumbers) {
|
||||
if (num % 10 == 0) {
|
||||
list0.add(num);
|
||||
} else {
|
||||
list1.add(num);
|
||||
}
|
||||
}
|
||||
LOG.info(Arrays.toString(list0.toArray()));
|
||||
LOG.info(Arrays.toString(list1.toArray()));
|
||||
assertIntegerList(list1, 11, 10);
|
||||
if (!list1.isEmpty()) {
|
||||
assertEquals(9, list0.size());
|
||||
}
|
||||
if (list.size() == 18) {
|
||||
return;
|
||||
}
|
||||
LOG.info("Waiting all logs pushed to slave. Expected 18 , actual " + list.size());
|
||||
Thread.sleep(200);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private List<Integer> getRowNumbers(List<Cell> cells) {
|
||||
List<Integer> listOfRowNumbers = new ArrayList<>(cells.size());
|
||||
for (Cell c : cells) {
|
||||
listOfRowNumbers.add(Integer.parseInt(Bytes
|
||||
.toString(c.getRowArray(), c.getRowOffset() + ROW.length,
|
||||
c.getRowLength() - ROW.length)));
|
||||
}
|
||||
return listOfRowNumbers;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void setUpAfterClass() throws Exception {
|
||||
utility2.shutdownMiniCluster();
|
||||
utility1.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
private void moveRegion(Table table, int index) throws IOException {
|
||||
List<Pair<RegionInfo, ServerName>> regions =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
|
||||
assertEquals(1, regions.size());
|
||||
RegionInfo regionInfo = regions.get(0).getFirst();
|
||||
ServerName name = utility1.getHBaseCluster().getRegionServer(index).getServerName();
|
||||
utility1.getAdmin()
|
||||
.move(regionInfo.getEncodedNameAsBytes(), Bytes.toBytes(name.getServerName()));
|
||||
while (true) {
|
||||
regions =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
|
||||
if (regions.get(0).getSecond().equals(name)) {
|
||||
break;
|
||||
}
|
||||
Threads.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
private void balanceTwoRegions(Table table) throws Exception {
|
||||
List<Pair<RegionInfo, ServerName>> regions =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
|
||||
assertEquals(2, regions.size());
|
||||
RegionInfo regionInfo1 = regions.get(0).getFirst();
|
||||
ServerName name1 = utility1.getHBaseCluster().getRegionServer(0).getServerName();
|
||||
RegionInfo regionInfo2 = regions.get(1).getFirst();
|
||||
ServerName name2 = utility1.getHBaseCluster().getRegionServer(1).getServerName();
|
||||
utility1.getAdmin()
|
||||
.move(regionInfo1.getEncodedNameAsBytes(), Bytes.toBytes(name1.getServerName()));
|
||||
utility1.getAdmin()
|
||||
.move(regionInfo2.getEncodedNameAsBytes(), Bytes.toBytes(name2.getServerName()));
|
||||
while (true) {
|
||||
regions =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), table.getName());
|
||||
if (regions.get(0).getSecond().equals(name1) && regions.get(1).getSecond().equals(name2)) {
|
||||
break;
|
||||
}
|
||||
Threads.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
private void waitTableHasRightNumberOfRegions(TableName tableName, int num) throws IOException {
|
||||
while (true) {
|
||||
List<Pair<RegionInfo, ServerName>> regions =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(utility1.getConnection(), tableName);
|
||||
if (regions.size() == num) {
|
||||
return;
|
||||
}
|
||||
Threads.sleep(100);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void assertIntegerList(List<Integer> list, int start, int step) {
|
||||
int size = list.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
assertEquals(start + step * i, list.get(i).intValue());
|
||||
}
|
||||
}
|
||||
}
|
|
@ -125,7 +125,7 @@ public class TestGlobalThrottler {
|
|||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
HTableDescriptor table = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor fam = new HColumnDescriptor(famName);
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_SERIAL);
|
||||
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||
table.addFamily(fam);
|
||||
utility1.getAdmin().createTable(table);
|
||||
utility2.getAdmin().createTable(table);
|
||||
|
|
|
@ -1367,11 +1367,9 @@ If a slave cluster does run out of room, or is inaccessible for other reasons, i
|
|||
.Consistency Across Replicated Clusters
|
||||
[WARNING]
|
||||
====
|
||||
How your application builds on top of the HBase API matters when replication is in play. HBase's replication system provides at-least-once delivery of client edits for an enabled column family to each configured destination cluster. In the event of failure to reach a given destination, the replication system will retry sending edits in a way that might repeat a given message. HBase provides two ways of replication, one is the original replication and the other is serial replication. In the previous way of replication, there is not a guaranteed order of delivery for client edits. In the event of a RegionServer failing, recovery of the replication queue happens independent of recovery of the individual regions that server was previously handling. This means that it is possible for the not-yet-replicated edits to be serviced by a RegionServer that is currently slower to replicate than the one that handles edits from after the failure.
|
||||
How your application builds on top of the HBase API matters when replication is in play. HBase's replication system provides at-least-once delivery of client edits for an enabled column family to each configured destination cluster. In the event of failure to reach a given destination, the replication system will retry sending edits in a way that might repeat a given message. Further more, there is not a guaranteed order of delivery for client edits. In the event of a RegionServer failing, recovery of the replication queue happens independent of recovery of the individual regions that server was previously handling. This means that it is possible for the not-yet-replicated edits to be serviced by a RegionServer that is currently slower to replicate than the one that handles edits from after the failure.
|
||||
|
||||
The combination of these two properties (at-least-once delivery and the lack of message ordering) means that some destination clusters may end up in a different state if your application makes use of operations that are not idempotent, e.g. Increments.
|
||||
|
||||
To solve the problem, HBase now supports serial replication, which sends edits to destination cluster as the order of requests from client.
|
||||
====
|
||||
|
||||
.Terminology Changes
|
||||
|
@ -1412,9 +1410,6 @@ Instead of SQL statements, entire WALEdits (consisting of multiple cell inserts
|
|||
LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
|
||||
----
|
||||
|
||||
.Serial Replication Configuration
|
||||
See <<Serial Replication,Serial Replication>>
|
||||
|
||||
.Cluster Management Commands
|
||||
add_peer <ID> <CLUSTER_KEY>::
|
||||
Adds a replication relationship between two clusters. +
|
||||
|
@ -1436,40 +1431,6 @@ enable_table_replication <TABLE_NAME>::
|
|||
disable_table_replication <TABLE_NAME>::
|
||||
Disable the table replication switch for all its column families.
|
||||
|
||||
=== Serial Replication
|
||||
|
||||
Note: this feature is introduced in HBase 1.5
|
||||
|
||||
.Function of serial replication
|
||||
|
||||
Serial replication supports to push logs to the destination cluster in the same order as logs reach to the source cluster.
|
||||
|
||||
.Why need serial replication?
|
||||
In replication of HBase, we push mutations to destination cluster by reading WAL in each region server. We have a queue for WAL files so we can read them in order of creation time. However, when region-move or RS failure occurs in source cluster, the hlog entries that are not pushed before region-move or RS-failure will be pushed by original RS(for region move) or another RS which takes over the remained hlog of dead RS(for RS failure), and the new entries for the same region(s) will be pushed by the RS which now serves the region(s), but they push the hlog entries of a same region concurrently without coordination.
|
||||
|
||||
This treatment can possibly lead to data inconsistency between source and destination clusters:
|
||||
|
||||
1. there are put and then delete written to source cluster.
|
||||
|
||||
2. due to region-move / RS-failure, they are pushed by different replication-source threads to peer cluster.
|
||||
|
||||
3. if delete is pushed to peer cluster before put, and flush and major-compact occurs in peer cluster before put is pushed to peer cluster, the delete is collected and the put remains in peer cluster, but in source cluster the put is masked by the delete, hence data inconsistency between source and destination clusters.
|
||||
|
||||
|
||||
.Serial replication configuration
|
||||
|
||||
. Set REPLICATION_SCOPE=>2 on the column family which is to be replicated serially when creating tables.
|
||||
|
||||
REPLICATION_SCOPE is a column family level attribute. Its value can be 0, 1 or 2. Value 0 means replication is disabled, 1 means replication is enabled but which not guarantee log order, and 2 means serial replication is enabled.
|
||||
|
||||
. This feature relies on zk-less assignment, and conflicts with distributed log replay, so users must set hbase.assignment.usezk=false and hbase.master.distributed.log.replay=false to support this feature.(Note that distributed log replay is deprecated and has already been purged from 2.0)
|
||||
|
||||
.Limitations in serial replication
|
||||
|
||||
Now we read and push logs in one RS to one peer in one thread, so if one log has not been pushed, all logs after it will be blocked. One wal file may contain wal edits from different tables, if one of the tables(or its CF) which REPLICATION_SCOPE is 2, and it is blocked, then all edits will be blocked, although other tables do not need serial replication. If you want to prevent this, then you need to split these tables/cfs into different peers.
|
||||
|
||||
More details about serial replication can be found in link:https://issues.apache.org/jira/browse/HBASE-9465[HBASE-9465].
|
||||
|
||||
=== Verifying Replicated Data
|
||||
|
||||
The `VerifyReplication` MapReduce job, which is included in HBase, performs a systematic comparison of replicated data between two different clusters. Run the VerifyReplication job on the master cluster, supplying it with the peer ID and table name to use for validation. You can limit the verification further by specifying a time range or specific families. The job's short name is `verifyrep`. To run the job, use a command like the following:
|
||||
|
|
Loading…
Reference in New Issue