HBASE-20148 Make serial replication as a option for a peer instead of a table
This commit is contained in:
parent
15da74ccee
commit
dd6f4525e7
|
@ -536,14 +536,6 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true if there are at least one cf whose replication scope is serial.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean hasSerialReplicationScope() {
|
|
||||||
return delegatee.hasSerialReplicationScope();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the configured replicas per region
|
* Returns the configured replicas per region
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -24,7 +24,7 @@ import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -231,11 +231,6 @@ public interface TableDescriptor {
|
||||||
*/
|
*/
|
||||||
boolean hasRegionMemStoreReplication();
|
boolean hasRegionMemStoreReplication();
|
||||||
|
|
||||||
/**
|
|
||||||
* @return true if there are at least one cf whose replication scope is serial.
|
|
||||||
*/
|
|
||||||
boolean hasSerialReplicationScope();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the compaction enable flag of the table is true. If flag is false
|
* Check if the compaction enable flag of the table is true. If flag is false
|
||||||
* then no minor/major compactions will be done in real.
|
* then no minor/major compactions will be done in real.
|
||||||
|
@ -274,6 +269,16 @@ public interface TableDescriptor {
|
||||||
*/
|
*/
|
||||||
boolean isReadOnly();
|
boolean isReadOnly();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if any of the table's cfs' replication scope are set to
|
||||||
|
* {@link HConstants#REPLICATION_SCOPE_GLOBAL}.
|
||||||
|
* @return {@code true} if we have, otherwise {@code false}.
|
||||||
|
*/
|
||||||
|
default boolean hasGlobalReplicationScope() {
|
||||||
|
return Stream.of(getColumnFamilies())
|
||||||
|
.anyMatch(cf -> cf.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the table's cfs' replication scope matched with the replication state
|
* Check if the table's cfs' replication scope matched with the replication state
|
||||||
* @param enabled replication state
|
* @param enabled replication state
|
||||||
|
@ -284,8 +289,7 @@ public interface TableDescriptor {
|
||||||
boolean hasDisabled = false;
|
boolean hasDisabled = false;
|
||||||
|
|
||||||
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
|
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
|
||||||
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
|
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
|
||||||
cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
|
|
||||||
hasDisabled = true;
|
hasDisabled = true;
|
||||||
} else {
|
} else {
|
||||||
hasEnabled = true;
|
hasEnabled = true;
|
||||||
|
|
|
@ -1053,15 +1053,6 @@ public class TableDescriptorBuilder {
|
||||||
return families.values().toArray(new ColumnFamilyDescriptor[families.size()]);
|
return families.values().toArray(new ColumnFamilyDescriptor[families.size()]);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return true if there are at least one cf whose replication scope is serial.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public boolean hasSerialReplicationScope() {
|
|
||||||
return families.values().stream()
|
|
||||||
.anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the configured replicas per region
|
* Returns the configured replicas per region
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -303,6 +303,10 @@ public final class ReplicationPeerConfigUtil {
|
||||||
builder.setReplicateAllUserTables(peer.getReplicateAll());
|
builder.setReplicateAllUserTables(peer.getReplicateAll());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (peer.hasSerial()) {
|
||||||
|
builder.setSerial(peer.getSerial());
|
||||||
|
}
|
||||||
|
|
||||||
Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
|
Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
|
||||||
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
|
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
|
||||||
if (excludeTableCFsMap != null) {
|
if (excludeTableCFsMap != null) {
|
||||||
|
@ -357,6 +361,7 @@ public final class ReplicationPeerConfigUtil {
|
||||||
|
|
||||||
builder.setBandwidth(peerConfig.getBandwidth());
|
builder.setBandwidth(peerConfig.getBandwidth());
|
||||||
builder.setReplicateAll(peerConfig.replicateAllUserTables());
|
builder.setReplicateAll(peerConfig.replicateAllUserTables());
|
||||||
|
builder.setSerial(peerConfig.isSerial());
|
||||||
|
|
||||||
ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
|
ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
|
||||||
if (excludeTableCFs != null) {
|
if (excludeTableCFs != null) {
|
||||||
|
|
|
@ -46,6 +46,7 @@ public class ReplicationPeerConfig {
|
||||||
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
|
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
|
||||||
private Set<String> excludeNamespaces = null;
|
private Set<String> excludeNamespaces = null;
|
||||||
private long bandwidth = 0;
|
private long bandwidth = 0;
|
||||||
|
private final boolean serial;
|
||||||
|
|
||||||
private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
|
private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
|
||||||
this.clusterKey = builder.clusterKey;
|
this.clusterKey = builder.clusterKey;
|
||||||
|
@ -64,6 +65,7 @@ public class ReplicationPeerConfig {
|
||||||
builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces)
|
builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces)
|
||||||
: null;
|
: null;
|
||||||
this.bandwidth = builder.bandwidth;
|
this.bandwidth = builder.bandwidth;
|
||||||
|
this.serial = builder.serial;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<TableName, List<String>>
|
private Map<TableName, List<String>>
|
||||||
|
@ -82,6 +84,7 @@ public class ReplicationPeerConfig {
|
||||||
public ReplicationPeerConfig() {
|
public ReplicationPeerConfig() {
|
||||||
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
this.configuration = new HashMap<>(0);
|
this.configuration = new HashMap<>(0);
|
||||||
|
this.serial = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -214,16 +217,20 @@ public class ReplicationPeerConfig {
|
||||||
return new ReplicationPeerConfigBuilderImpl();
|
return new ReplicationPeerConfigBuilderImpl();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSerial() {
|
||||||
|
return serial;
|
||||||
|
}
|
||||||
|
|
||||||
public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
|
public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
|
||||||
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
|
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
|
||||||
builder.setClusterKey(peerConfig.getClusterKey())
|
builder.setClusterKey(peerConfig.getClusterKey())
|
||||||
.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
|
.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
|
||||||
.putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
|
.putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
|
||||||
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
|
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
|
||||||
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
|
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
|
||||||
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
|
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
|
||||||
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
|
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
|
||||||
.setBandwidth(peerConfig.getBandwidth());
|
.setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial());
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,6 +257,8 @@ public class ReplicationPeerConfig {
|
||||||
|
|
||||||
private long bandwidth = 0;
|
private long bandwidth = 0;
|
||||||
|
|
||||||
|
private boolean serial = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
|
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
|
||||||
this.clusterKey = clusterKey;
|
this.clusterKey = clusterKey;
|
||||||
|
@ -312,6 +321,12 @@ public class ReplicationPeerConfig {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReplicationPeerConfigBuilder setSerial(boolean serial) {
|
||||||
|
this.serial = serial;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ReplicationPeerConfig build() {
|
public ReplicationPeerConfig build() {
|
||||||
// It would be nice to validate the configuration, but we have to work with "old" data
|
// It would be nice to validate the configuration, but we have to work with "old" data
|
||||||
|
@ -340,7 +355,8 @@ public class ReplicationPeerConfig {
|
||||||
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
|
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
builder.append("bandwidth=").append(bandwidth);
|
builder.append("bandwidth=").append(bandwidth).append(",");
|
||||||
|
builder.append("serial=").append(serial);
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,6 +137,18 @@ public interface ReplicationPeerConfigBuilder {
|
||||||
*/
|
*/
|
||||||
ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
|
ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Sets whether we should preserve order when replicating, i.e, serial replication.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Default {@code false}.
|
||||||
|
* </p>
|
||||||
|
* @param serial {@code true} means preserve order, otherwise {@code false}.
|
||||||
|
* @return {@code this}
|
||||||
|
*/
|
||||||
|
ReplicationPeerConfigBuilder setSerial(boolean serial);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builds the configuration object from the current state of {@code this}.
|
* Builds the configuration object from the current state of {@code this}.
|
||||||
* @return A {@link ReplicationPeerConfig} instance.
|
* @return A {@link ReplicationPeerConfig} instance.
|
||||||
|
|
|
@ -646,12 +646,6 @@ public final class HConstants {
|
||||||
*/
|
*/
|
||||||
public static final int REPLICATION_SCOPE_GLOBAL = 1;
|
public static final int REPLICATION_SCOPE_GLOBAL = 1;
|
||||||
|
|
||||||
/**
|
|
||||||
* Scope tag for serially scoped data
|
|
||||||
* This data will be replicated to all peers by the order of sequence id.
|
|
||||||
*/
|
|
||||||
public static final int REPLICATION_SCOPE_SERIAL = 2;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default cluster ID, cannot be used to identify a cluster so a key with
|
* Default cluster ID, cannot be used to identify a cluster so a key with
|
||||||
* this value means it wasn't meant for replication.
|
* this value means it wasn't meant for replication.
|
||||||
|
|
|
@ -48,6 +48,7 @@ message ReplicationPeer {
|
||||||
optional bool replicate_all = 8;
|
optional bool replicate_all = 8;
|
||||||
repeated TableCF exclude_table_cfs = 9;
|
repeated TableCF exclude_table_cfs = 9;
|
||||||
repeated bytes exclude_namespaces = 10;
|
repeated bytes exclude_namespaces = 10;
|
||||||
|
optional bool serial = 11;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -115,6 +115,9 @@ public final class ReplicationUtils {
|
||||||
if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
|
if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (rpc1.isSerial() != rpc2.isSerial()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if (rpc1.replicateAllUserTables()) {
|
if (rpc1.replicateAllUserTables()) {
|
||||||
return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
|
return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
|
||||||
isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
|
isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
|
||||||
|
|
|
@ -165,7 +165,7 @@ public class RegionStateStore {
|
||||||
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
|
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
|
||||||
// only update replication barrier for default replica
|
// only update replication barrier for default replica
|
||||||
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
|
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
|
||||||
hasSerialReplicationScope(regionInfo.getTable())) {
|
hasGlobalReplicationScope(regionInfo.getTable())) {
|
||||||
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
|
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
|
||||||
}
|
}
|
||||||
info.append(", openSeqNum=").append(openSeqNum);
|
info.append(", openSeqNum=").append(openSeqNum);
|
||||||
|
@ -224,7 +224,7 @@ public class RegionStateStore {
|
||||||
ServerName serverName) throws IOException {
|
ServerName serverName) throws IOException {
|
||||||
TableDescriptor htd = getTableDescriptor(parent.getTable());
|
TableDescriptor htd = getTableDescriptor(parent.getTable());
|
||||||
long parentOpenSeqNum = HConstants.NO_SEQNUM;
|
long parentOpenSeqNum = HConstants.NO_SEQNUM;
|
||||||
if (htd.hasSerialReplicationScope()) {
|
if (htd.hasGlobalReplicationScope()) {
|
||||||
parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
|
parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
|
||||||
}
|
}
|
||||||
MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
|
MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
|
||||||
|
@ -239,7 +239,7 @@ public class RegionStateStore {
|
||||||
TableDescriptor htd = getTableDescriptor(child.getTable());
|
TableDescriptor htd = getTableDescriptor(child.getTable());
|
||||||
long regionAOpenSeqNum = -1L;
|
long regionAOpenSeqNum = -1L;
|
||||||
long regionBOpenSeqNum = -1L;
|
long regionBOpenSeqNum = -1L;
|
||||||
if (htd.hasSerialReplicationScope()) {
|
if (htd.hasGlobalReplicationScope()) {
|
||||||
regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
|
regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
|
||||||
regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
|
regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
|
||||||
}
|
}
|
||||||
|
@ -261,12 +261,12 @@ public class RegionStateStore {
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
// Table Descriptors helpers
|
// Table Descriptors helpers
|
||||||
// ==========================================================================
|
// ==========================================================================
|
||||||
private boolean hasSerialReplicationScope(TableName tableName) throws IOException {
|
private boolean hasGlobalReplicationScope(TableName tableName) throws IOException {
|
||||||
return hasSerialReplicationScope(getTableDescriptor(tableName));
|
return hasGlobalReplicationScope(getTableDescriptor(tableName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasSerialReplicationScope(TableDescriptor htd) {
|
private boolean hasGlobalReplicationScope(TableDescriptor htd) {
|
||||||
return htd != null ? htd.hasSerialReplicationScope() : false;
|
return htd != null ? htd.hasGlobalReplicationScope() : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getRegionReplication(TableDescriptor htd) {
|
private int getRegionReplication(TableDescriptor htd) {
|
||||||
|
|
|
@ -37,31 +37,31 @@ public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Entry filter(Entry entry) {
|
public Entry filter(Entry entry) {
|
||||||
|
// Do not filter out an entire entry by replication scopes. As now we support serial
|
||||||
|
// replication, the sequence id of a marker is also needed by upper layer. We will filter out
|
||||||
|
// all the cells in the filterCell method below if the replication scopes is null or empty.
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) {
|
||||||
|
Integer scope = scopes.get(family);
|
||||||
|
return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public Cell filterCell(Entry entry, Cell cell) {
|
||||||
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
|
NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
|
||||||
if (scopes == null || scopes.isEmpty()) {
|
if (scopes == null || scopes.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return entry;
|
byte[] family = CellUtil.cloneFamily(cell);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Cell filterCell(Entry entry, Cell cell) {
|
|
||||||
final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
|
|
||||||
// The scope will be null or empty if
|
|
||||||
// there's nothing to replicate in that WALEdit
|
|
||||||
byte[] fam = CellUtil.cloneFamily(cell);
|
|
||||||
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
|
||||||
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
|
return bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(byte[] fam) {
|
public boolean apply(byte[] family) {
|
||||||
return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
|
return !hasGlobalScope(scopes, family);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else {
|
|
||||||
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return cell;
|
return hasGlobalScope(scopes, family) ? cell : null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -382,6 +382,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
return replicationPeer.isPeerEnabled();
|
return replicationPeer.isPeerEnabled();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isSerial() {
|
||||||
|
return replicationPeer.getPeerConfig().isSerial();
|
||||||
|
}
|
||||||
|
|
||||||
private void initialize() {
|
private void initialize() {
|
||||||
int sleepMultiplier = 1;
|
int sleepMultiplier = 1;
|
||||||
while (this.isSourceActive()) {
|
while (this.isSourceActive()) {
|
||||||
|
|
|
@ -72,18 +72,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
|
||||||
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
|
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
WALKeyImpl keyImpl = (WALKeyImpl) logKey;
|
|
||||||
// For serial replication we need to count all the sequence ids even for markers, so here we
|
|
||||||
// always need to retain the replication scopes to let the replication wal reader to know that
|
|
||||||
// we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for
|
|
||||||
// WALEdit.METAFAMILY.
|
|
||||||
if (keyImpl.hasSerialReplicationScope()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// For replay, or if all the cells are markers, do not need to store replication scope.
|
// For replay, or if all the cells are markers, do not need to store replication scope.
|
||||||
if (logEdit.isReplay() ||
|
if (logEdit.isReplay() ||
|
||||||
logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
|
logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
|
||||||
keyImpl.clearReplicationScope();
|
((WALKeyImpl) logKey).clearReplicationScope();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,9 +186,9 @@ public class ReplicationSourceWALReader extends Thread {
|
||||||
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
|
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
|
||||||
do {
|
do {
|
||||||
Entry entry = entryStream.peek();
|
Entry entry = entryStream.peek();
|
||||||
boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
|
boolean isSerial = source.isSerial();
|
||||||
boolean doFiltering = true;
|
boolean doFiltering = true;
|
||||||
if (hasSerialReplicationScope) {
|
if (isSerial) {
|
||||||
if (firstCellInEntryBeforeFiltering == null) {
|
if (firstCellInEntryBeforeFiltering == null) {
|
||||||
assert !entry.getEdit().isEmpty() : "should not write empty edits";
|
assert !entry.getEdit().isEmpty() : "should not write empty edits";
|
||||||
// Used to locate the region record in meta table. In WAL we only have the table name and
|
// Used to locate the region record in meta table. In WAL we only have the table name and
|
||||||
|
@ -208,7 +208,7 @@ public class ReplicationSourceWALReader extends Thread {
|
||||||
entry = filterEntry(entry);
|
entry = filterEntry(entry);
|
||||||
}
|
}
|
||||||
if (entry != null) {
|
if (entry != null) {
|
||||||
if (hasSerialReplicationScope) {
|
if (isSerial) {
|
||||||
if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
|
if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
|
||||||
if (batch.getLastWalPosition() > positionBefore) {
|
if (batch.getLastWalPosition() > positionBefore) {
|
||||||
// we have something that can push, break
|
// we have something that can push, break
|
||||||
|
|
|
@ -266,7 +266,7 @@ class SerialReplicationChecker {
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
byte[] row = CellUtil.cloneRow(firstCellInEdit);
|
byte[] row = CellUtil.cloneRow(firstCellInEdit);
|
||||||
while (!canPush(entry, row)) {
|
while (!canPush(entry, row)) {
|
||||||
LOG.debug("Can not push{}, wait", entry);
|
LOG.debug("Can not push {}, wait", entry);
|
||||||
Thread.sleep(waitTimeMs);
|
Thread.sleep(waitTimeMs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -419,14 +419,6 @@ public class WALKeyImpl implements WALKey {
|
||||||
setReplicationScope(null);
|
setReplicationScope(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean hasSerialReplicationScope() {
|
|
||||||
if (replicationScope == null || replicationScope.isEmpty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return replicationScope.values().stream()
|
|
||||||
.anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks that the cluster with the given clusterId has consumed the change
|
* Marks that the cluster with the given clusterId has consumed the change
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -32,9 +33,9 @@ import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -48,7 +49,7 @@ import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
@Category({ReplicationTests.class, SmallTests.class})
|
@Category({ ReplicationTests.class, SmallTests.class })
|
||||||
public class TestReplicationWALEntryFilters {
|
public class TestReplicationWALEntryFilters {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
|
@ -65,7 +66,8 @@ public class TestReplicationWALEntryFilters {
|
||||||
SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
|
SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
|
||||||
|
|
||||||
// meta
|
// meta
|
||||||
WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
|
WALKeyImpl key1 =
|
||||||
|
new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
|
||||||
TableName.META_TABLE_NAME, System.currentTimeMillis());
|
TableName.META_TABLE_NAME, System.currentTimeMillis());
|
||||||
Entry metaEntry = new Entry(key1, null);
|
Entry metaEntry = new Entry(key1, null);
|
||||||
|
|
||||||
|
@ -96,12 +98,15 @@ public class TestReplicationWALEntryFilters {
|
||||||
Entry userEntryEmpty = createEntry(null);
|
Entry userEntryEmpty = createEntry(null);
|
||||||
|
|
||||||
// no scopes
|
// no scopes
|
||||||
assertEquals(null, filter.filter(userEntry));
|
// now we will not filter out entries without a replication scope since serial replication still
|
||||||
|
// need the sequence id, but the cells will all be filtered out.
|
||||||
|
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
|
||||||
|
|
||||||
// empty scopes
|
// empty scopes
|
||||||
|
// ditto
|
||||||
TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
userEntry = createEntry(scopes, a, b);
|
userEntry = createEntry(scopes, a, b);
|
||||||
assertEquals(null, filter.filter(userEntry));
|
assertTrue(filter.filter(userEntry).getEdit().isEmpty());
|
||||||
|
|
||||||
// different scope
|
// different scope
|
||||||
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
|
@ -156,7 +156,8 @@ public class TestSerialReplication {
|
||||||
// add in disable state, so later when enabling it all sources will start push together.
|
// add in disable state, so later when enabling it all sources will start push together.
|
||||||
UTIL.getAdmin().addReplicationPeer(PEER_ID,
|
UTIL.getAdmin().addReplicationPeer(PEER_ID,
|
||||||
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
|
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
|
||||||
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
|
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
|
||||||
|
.build(),
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,7 +235,7 @@ public class TestSerialReplication {
|
||||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
UTIL.getAdmin().createTable(
|
UTIL.getAdmin().createTable(
|
||||||
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
|
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
|
||||||
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
|
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
|
||||||
UTIL.waitTableAvailable(tableName);
|
UTIL.waitTableAvailable(tableName);
|
||||||
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
|
@ -273,7 +274,7 @@ public class TestSerialReplication {
|
||||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
UTIL.getAdmin().createTable(
|
UTIL.getAdmin().createTable(
|
||||||
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
|
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
|
||||||
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
|
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
|
||||||
UTIL.waitTableAvailable(tableName);
|
UTIL.waitTableAvailable(tableName);
|
||||||
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
try (Table table = UTIL.getConnection().getTable(tableName)) {
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
|
@ -330,7 +331,7 @@ public class TestSerialReplication {
|
||||||
UTIL.getAdmin().createTable(
|
UTIL.getAdmin().createTable(
|
||||||
TableDescriptorBuilder.newBuilder(tableName)
|
TableDescriptorBuilder.newBuilder(tableName)
|
||||||
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
|
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
|
||||||
.setScope(HConstants.REPLICATION_SCOPE_SERIAL).build())
|
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
|
||||||
.build(),
|
.build(),
|
||||||
new byte[][] { splitKey });
|
new byte[][] { splitKey });
|
||||||
UTIL.waitTableAvailable(tableName);
|
UTIL.waitTableAvailable(tableName);
|
||||||
|
|
Loading…
Reference in New Issue