HBASE-20115 Reimplement serial replication based on the new replication storage layer

This commit is contained in:
zhangduo 2018-03-05 16:47:03 +08:00
parent 485af49e53
commit b7b8683925
36 changed files with 1282 additions and 341 deletions

View File

@ -25,9 +25,9 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder.ModifyableTableDesc
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor;
/** /**
* HTableDescriptor contains the details about an HBase table such as the descriptors of * HTableDescriptor contains the details about an HBase table such as the descriptors of
@ -537,6 +536,14 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
/**
* Return true if there are at least one cf whose replication scope is serial.
*/
@Override
public boolean hasSerialReplicationScope() {
return delegatee.hasSerialReplicationScope();
}
/** /**
* Returns the configured replicas per region * Returns the configured replicas per region
*/ */

View File

@ -34,6 +34,8 @@ import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type; import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@ -137,7 +140,7 @@ public class MetaTableAccessor {
private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class); private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class);
private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META"); private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META");
static final byte [] META_REGION_PREFIX; private static final byte[] META_REGION_PREFIX;
static { static {
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX. // Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
// FIRST_META_REGIONINFO == 'hbase:meta,,1'. META_REGION_PREFIX == 'hbase:meta,' // FIRST_META_REGIONINFO == 'hbase:meta,,1'. META_REGION_PREFIX == 'hbase:meta,'
@ -147,6 +150,11 @@ public class MetaTableAccessor {
META_REGION_PREFIX, 0, len); META_REGION_PREFIX, 0, len);
} }
private static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
private static final String REPLICATION_PARENT_SEPARATOR = "|";
private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
/** /**
* Lists all of the table regions currently in META. * Lists all of the table regions currently in META.
* Deprecated, keep there until some test use this. * Deprecated, keep there until some test use this.
@ -838,7 +846,7 @@ public class MetaTableAccessor {
/** /**
* Returns the column qualifier for serialized region state * Returns the column qualifier for serialized region state
* @return HConstants.TABLE_STATE_QUALIFIER * @return HConstants.STATE_QUALIFIER
*/ */
private static byte[] getRegionStateColumn() { private static byte[] getRegionStateColumn() {
return HConstants.STATE_QUALIFIER; return HConstants.STATE_QUALIFIER;
@ -1269,7 +1277,6 @@ public class MetaTableAccessor {
//////////////////////// ////////////////////////
// Editing operations // // Editing operations //
//////////////////////// ////////////////////////
/** /**
* Generates and returns a Put containing the region into for the catalog table * Generates and returns a Put containing the region into for the catalog table
*/ */
@ -1441,7 +1448,7 @@ public class MetaTableAccessor {
* Adds daughter region infos to hbase:meta row for the specified region. Note that this does not * Adds daughter region infos to hbase:meta row for the specified region. Note that this does not
* add its daughter's as different rows, but adds information about the daughters in the same row * add its daughter's as different rows, but adds information about the daughters in the same row
* as the parent. Use * as the parent. Use
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int)} * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)}
* if you want to do that. * if you want to do that.
* @param connection connection we're using * @param connection connection we're using
* @param regionInfo RegionInfo of parent region * @param regionInfo RegionInfo of parent region
@ -1467,7 +1474,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this * Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters * does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use * in the same row as the parent. Use
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName, int)} * {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)}
* if you want to do that. * if you want to do that.
* @param connection connection we're using * @param connection connection we're using
* @param regionInfo region information * @param regionInfo region information
@ -1522,20 +1529,37 @@ public class MetaTableAccessor {
} }
/** /**
* Merge the two regions into one in an atomic operation. Deletes the two * Merge the two regions into one in an atomic operation. Deletes the two merging regions in
* merging regions in hbase:meta and adds the merged region with the information of * hbase:meta and adds the merged region with the information of two merging regions.
* two merging regions.
* @param connection connection we're using * @param connection connection we're using
* @param mergedRegion the merged region * @param mergedRegion the merged region
* @param regionA merge parent region A * @param regionA merge parent region A
* @param regionAOpenSeqNum the next open sequence id for region A, used by serial replication. -1
* if not necessary.
* @param regionB merge parent region B * @param regionB merge parent region B
* @param regionBOpenSeqNum the next open sequence id for region B, used by serial replication. -1
* if not necessary.
* @param sn the location of the region * @param sn the location of the region
*/ */
public static void mergeRegions(final Connection connection, RegionInfo mergedRegion, public static void mergeRegions(Connection connection, RegionInfo mergedRegion,
RegionInfo regionA, RegionInfo regionB, ServerName sn, int regionReplication) RegionInfo regionA, long regionAOpenSeqNum, RegionInfo regionB, long regionBOpenSeqNum,
throws IOException { ServerName sn, int regionReplication) throws IOException {
try (Table meta = getMetaHTable(connection)) { try (Table meta = getMetaHTable(connection)) {
long time = EnvironmentEdgeManager.currentTime(); long time = EnvironmentEdgeManager.currentTime();
List<Mutation> mutations = new ArrayList<>();
List<RegionInfo> replicationParents = new ArrayList<>(2);
// Deletes for merging regions
mutations.add(makeDeleteFromRegionInfo(regionA, time));
if (regionAOpenSeqNum > 0) {
mutations.add(makePutForReplicationBarrier(regionA, regionAOpenSeqNum, time));
replicationParents.add(regionA);
}
mutations.add(makeDeleteFromRegionInfo(regionB, time));
if (regionBOpenSeqNum > 0) {
mutations.add(makePutForReplicationBarrier(regionB, regionBOpenSeqNum, time));
replicationParents.add(regionB);
}
// Put for parent // Put for parent
Put putOfMerged = makePutFromRegionInfo(mergedRegion, time); Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
@ -1555,18 +1579,13 @@ public class MetaTableAccessor {
.setType(Type.Put) .setType(Type.Put)
.setValue(RegionInfo.toByteArray(regionB)) .setValue(RegionInfo.toByteArray(regionB))
.build()); .build());
// Set initial state to CLOSED // Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then merged region gets added with the // NOTE: If initial state is not set to CLOSED then merged region gets added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of // default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign this offline region. This is followed by re-assignments of the // master tries to assign this offline region. This is followed by re-assignments of the
// merged region from resumed {@link MergeTableRegionsProcedure} // merged region from resumed {@link MergeTableRegionsProcedure}
addRegionStateToPut(putOfMerged, RegionState.State.CLOSED); addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
mutations.add(putOfMerged);
// Deletes for merging regions
Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
// The merged is a new region, openSeqNum = 1 is fine. ServerName may be null // The merged is a new region, openSeqNum = 1 is fine. ServerName may be null
// if crash after merge happened but before we got to here.. means in-memory // if crash after merge happened but before we got to here.. means in-memory
// locations of offlined merged, now-closed, regions is lost. Should be ok. We // locations of offlined merged, now-closed, regions is lost. Should be ok. We
@ -1580,26 +1599,30 @@ public class MetaTableAccessor {
for (int i = 1; i < regionReplication; i++) { for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putOfMerged, i); addEmptyLocation(putOfMerged, i);
} }
// add parent reference for serial replication
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() if (!replicationParents.isEmpty()) {
+ HConstants.DELIMITER); addReplicationParent(putOfMerged, replicationParents);
multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB); }
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(connection, meta, tableRow, mutations);
} }
} }
/** /**
* Splits the region into two in an atomic operation. Offlines the parent * Splits the region into two in an atomic operation. Offlines the parent region with the
* region with the information that it is split into two, and also adds * information that it is split into two, and also adds the daughter regions. Does not add the
* the daughter regions. Does not add the location information to the daughter * location information to the daughter regions since they are not open yet.
* regions since they are not open yet.
* @param connection connection we're using * @param connection connection we're using
* @param parent the parent region which is split * @param parent the parent region which is split
* @param parentOpenSeqNum the next open sequence id for parent region, used by serial
* replication. -1 if not necessary.
* @param splitA Split daughter region A * @param splitA Split daughter region A
* @param splitB Split daughter region B * @param splitB Split daughter region B
* @param sn the location of the region * @param sn the location of the region
*/ */
public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA, public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum,
RegionInfo splitB, ServerName sn, int regionReplication) throws IOException { RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication)
throws IOException {
try (Table meta = getMetaHTable(connection)) { try (Table meta = getMetaHTable(connection)) {
long time = EnvironmentEdgeManager.currentTime(); long time = EnvironmentEdgeManager.currentTime();
// Put for parent // Put for parent
@ -1611,7 +1634,11 @@ public class MetaTableAccessor {
// Puts for daughters // Puts for daughters
Put putA = makePutFromRegionInfo(splitA, time); Put putA = makePutFromRegionInfo(splitA, time);
Put putB = makePutFromRegionInfo(splitB, time); Put putB = makePutFromRegionInfo(splitB, time);
if (parentOpenSeqNum > 0) {
addReplicationBarrier(putParent, parentOpenSeqNum);
addReplicationParent(putA, Collections.singletonList(parent));
addReplicationParent(putB, Collections.singletonList(parent));
}
// Set initial state to CLOSED // Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then daughter regions get added with the // NOTE: If initial state is not set to CLOSED then daughter regions get added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of // default OFFLINE state. If Master gets restarted after this step, start up sequence of
@ -1671,20 +1698,15 @@ public class MetaTableAccessor {
} }
private static void multiMutate(Connection connection, Table table, byte[] row, private static void multiMutate(Connection connection, Table table, byte[] row,
Mutation... mutations) Mutation... mutations) throws IOException {
throws IOException {
multiMutate(connection, table, row, Arrays.asList(mutations)); multiMutate(connection, table, row, Arrays.asList(mutations));
} }
/** /**
* Performs an atomic multi-mutate operation against the given table. * Performs an atomic multi-mutate operation against the given table.
*/ */
// Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the below. Need to reveal private static void multiMutate(Connection connection, final Table table, byte[] row,
// this facility for CPEP use or at least those CPEPs that are on their way to becoming part of final List<Mutation> mutations) throws IOException {
// core as is the intent for RSGroup eventually.
public static void multiMutate(Connection connection, final Table table, byte[] row,
final List<Mutation> mutations)
throws IOException {
debugLogMutations(mutations); debugLogMutations(mutations);
// TODO: Need rollback!!!! // TODO: Need rollback!!!!
// TODO: Need Retry!!! // TODO: Need Retry!!!
@ -1785,9 +1807,7 @@ public class MetaTableAccessor {
* @param regionInfo region to be deleted from META * @param regionInfo region to be deleted from META
* @throws IOException * @throws IOException
*/ */
public static void deleteRegion(Connection connection, public static void deleteRegion(Connection connection, RegionInfo regionInfo) throws IOException {
RegionInfo regionInfo)
throws IOException {
long time = EnvironmentEdgeManager.currentTime(); long time = EnvironmentEdgeManager.currentTime();
Delete delete = new Delete(regionInfo.getRegionName()); Delete delete = new Delete(regionInfo.getRegionName());
delete.addFamily(getCatalogFamily(), time); delete.addFamily(getCatalogFamily(), time);
@ -1904,6 +1924,33 @@ public class MetaTableAccessor {
.build()); .build());
} }
private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
.map(RegionInfo::getRegionNameAsString).collect(Collectors
.collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes));
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
}
private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
throws IOException {
Put put = new Put(regionInfo.getRegionName(), ts);
addReplicationBarrier(put, openSeqNum);
return put;
}
public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException {
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
.setQualifier(HConstants.SEQNUM_QUALIFIER)
.setTimestamp(put.getTimeStamp())
.setType(Type.Put)
.setValue(Bytes.toBytes(openSeqNum))
.build());
}
private static Put addEmptyLocation(Put p, int replicaId) throws IOException { private static Put addEmptyLocation(Put p, int replicaId) throws IOException {
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
return p.add(builder.clear() return p.add(builder.clear()
@ -1929,6 +1976,92 @@ public class MetaTableAccessor {
.build()); .build());
} }
public static final class ReplicationBarrierResult {
private final long[] barriers;
private final RegionState.State state;
private final List<byte[]> parentRegionNames;
public ReplicationBarrierResult(long[] barriers, State state, List<byte[]> parentRegionNames) {
this.barriers = barriers;
this.state = state;
this.parentRegionNames = parentRegionNames;
}
public long[] getBarriers() {
return barriers;
}
public RegionState.State getState() {
return state;
}
public List<byte[]> getParentRegionNames() {
return parentRegionNames;
}
}
private static long getReplicationBarrier(Cell c) {
return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
private static long[] getReplicationBarriers(Result result) {
return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
.stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
}
private static ReplicationBarrierResult getReplicationBarrierResult(Result result) {
long[] barriers = getReplicationBarriers(result);
byte[] stateBytes = result.getValue(getCatalogFamily(), getRegionStateColumn());
RegionState.State state =
stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
byte[] parentRegionsBytes =
result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
List<byte[]> parentRegionNames =
parentRegionsBytes != null
? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
.map(Bytes::toBytes).collect(Collectors.toList())
: Collections.emptyList();
return new ReplicationBarrierResult(barriers, state, parentRegionNames);
}
public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn,
TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException {
byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
byte[] metaStopKey =
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
.addColumn(getCatalogFamily(), getRegionStateColumn())
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
.setCaching(10);
try (Table table = getMetaHTable(conn); ResultScanner scanner = table.getScanner(scan)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
return new ReplicationBarrierResult(new long[0], null, Collections.emptyList());
}
byte[] regionName = result.getRow();
// TODO: we may look up a region which has already been split or merged so we need to check
// whether the encoded name matches. Need to find a way to quit earlier when there is no
// record for the given region, for now it will scan to the end of the table.
if (!Bytes.equals(encodedRegionName,
Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) {
continue;
}
return getReplicationBarrierResult(result);
}
}
}
public static long[] getReplicationBarrier(Connection conn, byte[] regionName)
throws IOException {
try (Table table = getMetaHTable(conn)) {
Result result = table.get(new Get(regionName)
.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
.readAllVersions());
return getReplicationBarriers(result);
}
}
private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException { private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) { if (!METALOG.isDebugEnabled()) {
return; return;

View File

@ -231,6 +231,11 @@ public interface TableDescriptor {
*/ */
boolean hasRegionMemStoreReplication(); boolean hasRegionMemStoreReplication();
/**
* @return true if there are at least one cf whose replication scope is serial.
*/
boolean hasSerialReplicationScope();
/** /**
* Check if the compaction enable flag of the table is true. If flag is false * Check if the compaction enable flag of the table is true. If flag is false
* then no minor/major compactions will be done in real. * then no minor/major compactions will be done in real.
@ -279,7 +284,8 @@ public interface TableDescriptor {
boolean hasDisabled = false; boolean hasDisabled = false;
for (ColumnFamilyDescriptor cf : getColumnFamilies()) { for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
hasDisabled = true; hasDisabled = true;
} else { } else {
hasEnabled = true; hasEnabled = true;

View File

@ -1053,6 +1053,15 @@ public class TableDescriptorBuilder {
return families.values().toArray(new ColumnFamilyDescriptor[families.size()]); return families.values().toArray(new ColumnFamilyDescriptor[families.size()]);
} }
/**
* Return true if there are at least one cf whose replication scope is serial.
*/
@Override
public boolean hasSerialReplicationScope() {
return families.values().stream()
.anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
}
/** /**
* Returns the configured replicas per region * Returns the configured replicas per region
*/ */

View File

@ -487,6 +487,12 @@ public final class HConstants {
/** The serialized table state qualifier */ /** The serialized table state qualifier */
public static final byte[] TABLE_STATE_QUALIFIER = Bytes.toBytes("state"); public static final byte[] TABLE_STATE_QUALIFIER = Bytes.toBytes("state");
/** The replication barrier family as a string*/
public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
/** The replication barrier family */
public static final byte[] REPLICATION_BARRIER_FAMILY =
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
/** /**
* The meta table version column qualifier. * The meta table version column qualifier.
@ -640,6 +646,12 @@ public final class HConstants {
*/ */
public static final int REPLICATION_SCOPE_GLOBAL = 1; public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
* Scope tag for serially scoped data
* This data will be replicated to all peers by the order of sequence id.
*/
public static final int REPLICATION_SCOPE_SERIAL = 2;
/** /**
* Default cluster ID, cannot be used to identify a cluster so a key with * Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication. * this value means it wasn't meant for replication.

View File

@ -208,7 +208,16 @@ public class MasterFileSystem {
/** /**
* @return HBase root log dir. * @return HBase root log dir.
*/ */
public Path getWALRootDir() { return this.walRootDir; } public Path getWALRootDir() {
return this.walRootDir;
}
/**
* @return the directory for a give {@code region}.
*/
public Path getRegionDir(RegionInfo region) {
return FSUtils.getRegionDir(FSUtils.getTableDir(getRootDir(), region.getTable()), region);
}
/** /**
* @return HBase temp dir. * @return HBase temp dir.

View File

@ -1569,8 +1569,7 @@ public class AssignmentManager implements ServerListener {
} }
public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
final RegionInfo daughterA, final RegionInfo daughterB) final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
throws IOException {
// Update hbase:meta. Parent will be marked offline and split up in hbase:meta. // Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
// The parent stays in regionStates until cleared when removed by CatalogJanitor. // The parent stays in regionStates until cleared when removed by CatalogJanitor.
// Update its state in regionStates to it shows as offline and split when read // Update its state in regionStates to it shows as offline and split when read

View File

@ -1,5 +1,4 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.master.assignment; package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException; import java.io.IOException;
@ -36,11 +34,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -163,6 +163,11 @@ public class RegionStateStore {
Preconditions.checkArgument(state == State.OPEN && regionLocation != null, Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
"Open region should be on a server"); "Open region should be on a server");
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId); MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
// only update replication barrier for default replica
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
hasSerialReplicationScope(regionInfo.getTable())) {
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
}
info.append(", openSeqNum=").append(openSeqNum); info.append(", openSeqNum=").append(openSeqNum);
info.append(", regionLocation=").append(regionLocation); info.append(", regionLocation=").append(regionLocation);
} else if (regionLocation != null && !regionLocation.equals(lastHost)) { } else if (regionLocation != null && !regionLocation.equals(lastHost)) {
@ -205,24 +210,41 @@ public class RegionStateStore {
} }
} }
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
MasterFileSystem mfs = master.getMasterFileSystem();
long maxSeqId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}
// ============================================================================================ // ============================================================================================
// Update Region Splitting State helpers // Update Region Splitting State helpers
// ============================================================================================ // ============================================================================================
public void splitRegion(final RegionInfo parent, final RegionInfo hriA, public void splitRegion(RegionInfo parent, RegionInfo hriA, RegionInfo hriB,
final RegionInfo hriB, final ServerName serverName) throws IOException { ServerName serverName) throws IOException {
final TableDescriptor htd = getTableDescriptor(parent.getTable()); TableDescriptor htd = getTableDescriptor(parent.getTable());
MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName, long parentOpenSeqNum = HConstants.NO_SEQNUM;
getRegionReplication(htd)); if (htd.hasSerialReplicationScope()) {
parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
}
MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
serverName, getRegionReplication(htd));
} }
// ============================================================================================ // ============================================================================================
// Update Region Merging State helpers // Update Region Merging State helpers
// ============================================================================================ // ============================================================================================
public void mergeRegions(final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB, public void mergeRegions(RegionInfo child, RegionInfo hriA, RegionInfo hriB,
final ServerName serverName) throws IOException { ServerName serverName) throws IOException {
final TableDescriptor htd = getTableDescriptor(parent.getTable()); TableDescriptor htd = getTableDescriptor(child.getTable());
MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName, long regionAOpenSeqNum = -1L;
getRegionReplication(htd)); long regionBOpenSeqNum = -1L;
if (htd.hasSerialReplicationScope()) {
regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
}
MetaTableAccessor.mergeRegions(master.getConnection(), child, hriA, regionAOpenSeqNum, hriB,
regionBOpenSeqNum, serverName, getRegionReplication(htd));
} }
// ============================================================================================ // ============================================================================================
@ -239,11 +261,19 @@ public class RegionStateStore {
// ========================================================================== // ==========================================================================
// Table Descriptors helpers // Table Descriptors helpers
// ========================================================================== // ==========================================================================
private int getRegionReplication(final TableDescriptor htd) { private boolean hasSerialReplicationScope(TableName tableName) throws IOException {
return (htd != null) ? htd.getRegionReplication() : 1; return hasSerialReplicationScope(getTableDescriptor(tableName));
} }
private TableDescriptor getTableDescriptor(final TableName tableName) throws IOException { private boolean hasSerialReplicationScope(TableDescriptor htd) {
return htd != null ? htd.hasSerialReplicationScope() : false;
}
private int getRegionReplication(TableDescriptor htd) {
return htd != null ? htd.getRegionReplication() : 1;
}
private TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
return master.getTableDescriptors().get(tableName); return master.getTableDescriptors().get(tableName);
} }

View File

@ -247,7 +247,7 @@ public class SplitTableRegionProcedure
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
break; break;
case SPLIT_TABLE_REGION_UPDATE_META: case SPLIT_TABLE_REGION_UPDATE_META:
updateMetaForDaughterRegions(env); updateMeta(env);
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META); setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META);
break; break;
case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META:
@ -756,7 +756,7 @@ public class SplitTableRegionProcedure
* Add daughter regions to META * Add daughter regions to META
* @param env MasterProcedureEnv * @param env MasterProcedureEnv
*/ */
private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException { private void updateMeta(final MasterProcedureEnv env) throws IOException {
env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env), env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env),
daughter_1_RI, daughter_2_RI); daughter_1_RI, daughter_2_RI);
} }

View File

@ -24,11 +24,8 @@ import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -120,8 +117,6 @@ public abstract class AbstractStateMachineTableProcedure<TState>
} }
protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException { protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); return env.getMasterServices().getMasterFileSystem().getRegionDir(region);
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName());
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
} }
} }

View File

@ -1,5 +1,4 @@
/** /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -25,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -84,6 +83,7 @@ public class HRegionFileSystem {
private final Configuration conf; private final Configuration conf;
private final Path tableDir; private final Path tableDir;
private final FileSystem fs; private final FileSystem fs;
private final Path regionDir;
/** /**
* In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
@ -105,9 +105,10 @@ public class HRegionFileSystem {
final RegionInfo regionInfo) { final RegionInfo regionInfo) {
this.fs = fs; this.fs = fs;
this.conf = conf; this.conf = conf;
this.tableDir = tableDir; this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null");
this.regionInfo = regionInfo; this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null");
this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo); this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
this.regionDir = FSUtils.getRegionDir(tableDir, regionInfo);
this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
@ -135,7 +136,7 @@ public class HRegionFileSystem {
/** @return {@link Path} to the region directory. */ /** @return {@link Path} to the region directory. */
public Path getRegionDir() { public Path getRegionDir() {
return new Path(this.tableDir, this.regionInfoForFs.getEncodedName()); return regionDir;
} }
// =========================================================================== // ===========================================================================

View File

@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.replication;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config, * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
@ -47,7 +44,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
@InterfaceAudience.Private @InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class);
private final ReplicationPeer peer; private final ReplicationPeer peer;
private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();

View File

@ -15,17 +15,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;
import java.util.NavigableMap; import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
@ -35,7 +33,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
@Override @Override
public Entry filter(Entry entry) { public Entry filter(Entry entry) {

View File

@ -194,4 +194,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
public ServerName getServerWALsBelongTo() { public ServerName getServerWALsBelongTo() {
return this.replicationQueueInfo.getDeadRegionServers().get(0); return this.replicationQueueInfo.getDeadRegionServers().get(0);
} }
@Override
public boolean isRecovered() {
return true;
}
} }

View File

@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -20,12 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -127,13 +124,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
return startPosition; return startPosition;
} }
@Override
protected void updateLogPosition(long lastReadPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, true);
lastLoggedPosition = lastReadPosition;
}
private void terminate(String reason, Exception cause) { private void terminate(String reason, Exception cause) {
if (cause == null) { if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason); LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader { public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class); LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
@ -45,13 +46,11 @@ public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALRea
} }
@Override @Override
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL"); LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down // we're done with queue recovery, shut ourself down
setReaderRunning(false); setReaderRunning(false);
// shuts down shipper thread immediately // shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath));
: new WALEntryBatch(replicationBatchCountCapacity, currentPath));
} }
} }

View File

@ -597,4 +597,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
public ServerName getServerWALsBelongTo() { public ServerName getServerWALsBelongTo() {
return server.getServerName(); return server.getServerName();
} }
Server getServer() {
return server;
}
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
} }

View File

@ -166,4 +166,11 @@ public interface ReplicationSourceInterface {
* @return the server name which all WALs belong to * @return the server name which all WALs belong to
*/ */
ServerName getServerWALsBelongTo(); ServerName getServerWALsBelongTo();
/**
* @return whether this is a replication source for recovery.
*/
default boolean isRecovered() {
return false;
}
} }

View File

@ -481,10 +481,10 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region server * @param queueRecovered indicates if this queue comes from another region server
*/ */
public void logPositionAndCleanOldLogs(Path log, String queueId, long position, public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
boolean queueRecovered) { Map<String, Long> lastSeqIds, boolean queueRecovered) {
String fileName = log.getName(); String fileName = log.getName();
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
position, null)); position, lastSeqIds));
cleanOldLogs(fileName, queueId, queueRecovered); cleanOldLogs(fileName, queueId, queueRecovered);
} }

View File

@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -20,13 +19,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -128,7 +127,7 @@ public class ReplicationSourceShipper extends Thread {
int sleepMultiplier = 0; int sleepMultiplier = 0;
if (entries.isEmpty()) { if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) { if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition); updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
// if there was nothing to ship and it's not an error // if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current // set "ageOfLastShippedOp" to <now> to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@ -174,7 +173,7 @@ public class ReplicationSourceShipper extends Thread {
cleanUpHFileRefs(entries.get(i).getEdit()); cleanUpHFileRefs(entries.get(i).getEdit());
} }
// Log and clean up WAL logs // Log and clean up WAL logs
updateLogPosition(lastReadPosition); updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
} }
source.postShipEdits(entries, currentSize); source.postShipEdits(entries, currentSize);
@ -222,9 +221,9 @@ public class ReplicationSourceShipper extends Thread {
} }
} }
protected void updateLogPosition(long lastReadPosition) { private void updateLogPosition(long lastReadPosition, Map<String, Long> lastSeqIds) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, false); lastReadPosition, lastSeqIds, source.isRecovered());
lastLoggedPosition = lastReadPosition; lastLoggedPosition = lastReadPosition;
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -31,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/** /**
* Used to receive new wals. * Used to receive new wals.
*/ */
@ -68,31 +65,25 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
* compaction WAL edits and if the scope is local. * compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits * @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes * @param logEdit Edits used to lookup the scopes
* @throws IOException If failed to parse the WALEdit
*/ */
@VisibleForTesting @VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) {
boolean replicationForBulkLoadEnabled = // For bulk load replication we need meta family to know the file we want to replicate.
ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
boolean foundOtherEdits = false; return;
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
break;
} }
WALKeyImpl keyImpl = (WALKeyImpl) logKey;
// For serial replication we need to count all the sequence ids even for markers, so here we
// always need to retain the replication scopes to let the replication wal reader to know that
// we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for
// WALEdit.METAFAMILY.
if (keyImpl.hasSerialReplicationScope()) {
return;
} }
// For replay, or if all the cells are markers, do not need to store replication scope.
if (!foundOtherEdits && logEdit.getCells().size() > 0) { if (logEdit.isReplay() ||
WALProtos.RegionEventDescriptor maybeEvent = logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); keyImpl.clearReplicationScope();
if (maybeEvent != null &&
(maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
((WALKeyImpl) logKey).serializeReplicationScope(false);
} }
} }
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/** /**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
* * onto a queue
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
@ -77,6 +77,8 @@ public class ReplicationSourceWALReader extends Thread {
private AtomicLong totalBufferUsed; private AtomicLong totalBufferUsed;
private long totalBufferQuota; private long totalBufferQuota;
private final SerialReplicationChecker serialReplicationChecker;
/** /**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue. * entries, and puts them on a batch queue.
@ -111,6 +113,7 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
LOG.info("peerClusterZnode=" + source.getQueueId() LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@ -131,15 +134,14 @@ public class ReplicationSourceWALReader extends Thread {
continue; continue;
} }
WALEntryBatch batch = readWALEntries(entryStream); WALEntryBatch batch = readWALEntries(entryStream);
if (batch != null && batch.getNbEntries() > 0) { if (batch != null) {
if (LOG.isTraceEnabled()) { // need to propagate the batch even it has no entries since it may carry the last
LOG.trace(String.format("Read %s WAL entries eligible for replication", // sequence id information for serial replication.
batch.getNbEntries())); LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries());
}
entryBatchQueue.put(batch); entryBatchQueue.put(batch);
sleepMultiplier = 1; sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL } else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath()); handleEmptyWALEntryBatch(entryStream.getCurrentPath());
} }
currentPosition = entryStream.getPosition(); currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream entryStream.reset(); // reuse stream
@ -160,34 +162,66 @@ public class ReplicationSourceWALReader extends Thread {
} }
} }
private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException { private WALEntryBatch readWALEntries(WALEntryStream entryStream)
WALEntryBatch batch = null; throws IOException, InterruptedException {
while (entryStream.hasNext()) { if (!entryStream.hasNext()) {
if (batch == null) { return null;
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); }
WALEntryBatch batch =
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
do {
Entry entry = entryStream.peek();
batch.setLastWalPosition(entryStream.getPosition());
boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
// Used to locate the region record in meta table. In WAL we only have the table name and
// encoded region name which can not be mapping to region name without scanning all the
// records for a table, so we need a start key, just like what we have done at client side
// when locating a region. For the markers, we will use the start key of the region as the row
// key for the edit. And we need to do this before filtering since all the cells may be
// filtered out, especially that for the markers.
Cell firstCellInEdit = null;
if (hasSerialReplicationScope) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
firstCellInEdit = entry.getEdit().getCells().get(0);
} }
Entry entry = entryStream.next();
entry = filterEntry(entry); entry = filterEntry(entry);
if (entry != null) { if (entry != null) {
if (hasSerialReplicationScope) {
if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
if (batch.getNbEntries() > 0) {
// we have something that can push, break
break;
} else {
serialReplicationChecker.waitUntilCanPush(entry, firstCellInEdit);
}
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId());
}
// actually remove the entry.
entryStream.next();
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) { if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry); long entrySize = getEntrySize(entry);
batch.addEntry(entry); batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big // Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
|| batch.getNbEntries() >= replicationBatchCountCapacity) { batch.getNbEntries() >= replicationBatchCountCapacity) {
break; break;
} }
} }
} else {
// actually remove the entry.
entryStream.next();
} }
} } while (entryStream.hasNext());
return batch; return batch;
} }
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL"); LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries); Thread.sleep(sleepForRetries);
} }
@ -214,7 +248,7 @@ public class ReplicationSourceWALReader extends Thread {
// if we've read some WAL entries, get the Path we read from // if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek(); WALEntryBatch batchQueueHead = entryBatchQueue.peek();
if (batchQueueHead != null) { if (batchQueueHead != null) {
return batchQueueHead.lastWalPath; return batchQueueHead.getLastWalPath();
} }
// otherwise, we must be currently reading from the head of the log queue // otherwise, we must be currently reading from the head of the log queue
return logQueue.peek(); return logQueue.peek();
@ -253,16 +287,13 @@ public class ReplicationSourceWALReader extends Thread {
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
} }
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
WALEdit edit = entry.getEdit(); WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
batch.incrementHeapSize(entrySize); batch.incrementHeapSize(entrySize);
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
} }
batch.lastWalPosition = entryPosition;
}
/** /**
* Count the number of different row keys in the given edit because of mini-batching. We assume * Count the number of different row keys in the given edit because of mini-batching. We assume
@ -355,101 +386,4 @@ public class ReplicationSourceWALReader extends Thread {
public void setReaderRunning(boolean readerRunning) { public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning; this.isReaderRunning = readerRunning;
} }
/**
* Holds a batch of WAL entries to replicate, along with some statistics
*
*/
static class WALEntryBatch {
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
private long lastWalPosition = 0;
// number of distinct row keys in this batch
private int nbRowKeys = 0;
// number of HFiles
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
/**
* @param walEntries
* @param lastWalPath Path of the WAL the last entry in this batch was read from
* @param lastWalPosition Position in the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
public void addEntry(Entry entry) {
walEntries.add(entry);
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
return walEntries;
}
/**
* @return the path of the last WAL that was read.
*/
public Path getLastWalPath() {
return lastWalPath;
}
/**
* @return the position in the last WAL that was read.
*/
public long getLastWalPosition() {
return lastWalPosition;
}
public int getNbEntries() {
return walEntries.size();
}
/**
* @return the number of distinct row keys in this batch
*/
public int getNbRowKeys() {
return nbRowKeys;
}
/**
* @return the number of HFiles in this batch
*/
public int getNbHFiles() {
return nbHFiles;
}
/**
* @return total number of operations in this batch
*/
public int getNbOperations() {
return getNbRowKeys() + getNbHFiles();
}
/**
* @return the heap size of this batch
*/
public long getHeapSize() {
return heapSize;
}
private void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
private void incrementNbHFiles(int increment) {
nbHFiles += increment;
}
private void incrementHeapSize(long increment) {
heapSize += increment;
}
}
} }

View File

@ -0,0 +1,255 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.ReplicationBarrierResult;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* <p>
* Helper class to determine whether we can push a given WAL entry without breaking the replication
* order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe.
* </p>
* <p>
* We record all the open sequence number for a region in a special family in meta, which is called
* 'barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call [bn,
* bn+1) a range, and it is obvious that a region will always be on the same RS within a range.
* <p>
* When split and merge, we will also record the parent for the generated region(s) in the special
* family in meta. And also, we will write an extra 'open sequence number' for the parent region(s),
* which is the max sequence id of the region plus one.
* </p>
* </p>
* <p>
* For each peer, we record the last pushed sequence id for each region. It is managed by the
* replication storage.
* </p>
* <p>
* The algorithm works like this:
* <ol>
* <li>Locate the sequence id we want to push in the barriers</li>
* <li>If it is before the first barrier, we are safe to push. This usually because we enable serial
* replication for this table after we create the table and write data into the table.</li>
* <li>In general, if the previous range is finished, then we are safe to push. The way to determine
* whether a range is finish is straight-forward: check whether the last pushed sequence id is equal
* to the end barrier of the range minus 1. There are several exceptions:
* <ul>
* <li>If it is in the first range, we need to check whether there are parent regions. If so, we
* need to make sure that the data for parent regions have all been pushed.</li>
* <li>If it is in the last range, we need to check the region state. If state is OPENING, then we
* are not safe to push. This is because that, before we call reportRIT to master which update the
* open sequence number into meta table, we will write a open region event marker to WAL first, and
* its sequence id is greater than the newest open sequence number(which has not been updated to
* meta table yet so we do not know). For this scenario, the WAL entry for this open region event
* marker actually belongs to the range after the 'last' range, so we are not safe to push it.
* Otherwise the last pushed sequence id will be updated to this value and then we think the
* previous range has already been finished, but this is not true.</li>
* <li>Notice that the above two exceptions are not conflicts, since the first range can also be the
* last range if we only have one range.</li>
* </ul>
* </li>
* </ol>
* </p>
* <p>
* And for performance reason, we do not want to check meta for every WAL entry, so we introduce two
* in memory maps. The idea is simple:
* <ul>
* <li>If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.</li>
* <li>Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If
* the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe
* to push.</li>
* </ul>
* And for the last range, we do not have an end barrier, so we use the continuity of sequence id to
* determine whether we can push. The rule is:
* <ul>
* <li>When an entry is able to push, then put its sequence id into the {@code pushed} map.</li>
* <li>Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus
* one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of
* the WAL entry.</li>
* </ul>
* </p>
*/
@InterfaceAudience.Private
class SerialReplicationChecker {
public static final String REPLICATION_SERIALLY_WAITING_KEY =
"hbase.serial.replication.waiting.ms";
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
private final String peerId;
private final ReplicationQueueStorage storage;
private final Connection conn;
private final long waitTimeMs;
private final LoadingCache<String, MutableLong> pushed = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, MutableLong>() {
@Override
public MutableLong load(String key) throws Exception {
return new MutableLong(HConstants.NO_SEQNUM);
}
});
// Use guava cache to set ttl for each key
private final Cache<String, Long> canPushUnder =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
this.peerId = source.getPeerId();
this.storage = source.getQueueStorage();
this.conn = source.getServer().getConnection();
this.waitTimeMs =
conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
}
private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException {
long pushedSeqId;
try {
pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
} catch (ReplicationException e) {
throw new IOException(
"Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e);
}
// endBarrier is the open sequence number. When opening a region, the open sequence number will
// be set to the old max sequence id plus one, so here we need to minus one.
return pushedSeqId >= endBarrier - 1;
}
private boolean isParentFinished(byte[] regionName) throws IOException {
long[] barriers = MetaTableAccessor.getReplicationBarrier(conn, regionName);
if (barriers.length == 0) {
return true;
}
return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName));
}
// We may write a open region marker to WAL before we write the open sequence number to meta, so
// if a region is in OPENING state and we are in the last range, it is not safe to say we can push
// even if the previous range is finished.
private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) {
return index == barrierResult.getBarriers().length &&
barrierResult.getState() == RegionState.State.OPENING;
}
private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) {
if (barriers.length > index) {
canPushUnder.put(encodedNameAsString, barriers[index]);
}
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
}
private boolean canPush(Entry entry, byte[] row) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
long[] barriers = barrierResult.getBarriers();
int index = Arrays.binarySearch(barriers, seqId);
if (index == -1) {
// This means we are in the range before the first record openSeqNum, this usually because the
// wal is written before we enable serial replication for this table, just return true since
// we can not guarantee the order.
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
return true;
}
// The sequence id range is left closed and right open, so either we decrease the missed insert
// point to make the index start from 0, or increase the hit insert point to make the index
// start from 1. Here we choose the latter one.
if (index < 0) {
index = -index - 1;
} else {
index++;
}
if (index == 1) {
// we are in the first range, check whether we have parents
for (byte[] regionName : barrierResult.getParentRegionNames()) {
if (!isParentFinished(regionName)) {
return false;
}
}
if (isLastRangeAndOpening(barrierResult, index)) {
return false;
}
recordCanPush(encodedNameAsString, seqId, barriers, 1);
return true;
}
// check whether the previous range is finished
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
return false;
}
if (isLastRangeAndOpening(barrierResult, index)) {
return false;
}
recordCanPush(encodedNameAsString, seqId, barriers, index);
return true;
}
public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
if (canReplicateUnderSeqId != null) {
if (seqId < canReplicateUnderSeqId.longValue()) {
return true;
}
// we are already beyond the last safe point, remove
canPushUnder.invalidate(encodedNameAsString);
}
// This is for the case where the region is currently opened on us, if the sequence id is
// continuous then we are safe to replicate. If there is a breakpoint, then maybe the region
// has been moved to another RS and then back, so we need to check the barrier.
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
if (seqId == previousPushedSeqId.longValue() + 1) {
previousPushedSeqId.increment();
return true;
}
return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
}
public void waitUntilCanPush(Entry entry, Cell firstCellInEdit)
throws IOException, InterruptedException {
byte[] row = CellUtil.cloneRow(firstCellInEdit);
while (!canPush(entry, row)) {
Thread.sleep(waitTimeMs);
}
}
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Holds a batch of WAL entries to replicate, along with some statistics
*/
@InterfaceAudience.Private
class WALEntryBatch {
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
private long lastWalPosition = 0;
// number of distinct row keys in this batch
private int nbRowKeys = 0;
// number of HFiles
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
// save the last sequenceid for each region if the table has serial-replication scope
private Map<String, Long> lastSeqIds = new HashMap<>();
/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
public void addEntry(Entry entry) {
walEntries.add(entry);
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
return walEntries;
}
/**
* @return the path of the last WAL that was read.
*/
public Path getLastWalPath() {
return lastWalPath;
}
/**
* @return the position in the last WAL that was read.
*/
public long getLastWalPosition() {
return lastWalPosition;
}
public void setLastWalPosition(long lastWalPosition) {
this.lastWalPosition = lastWalPosition;
}
public int getNbEntries() {
return walEntries.size();
}
/**
* @return the number of distinct row keys in this batch
*/
public int getNbRowKeys() {
return nbRowKeys;
}
/**
* @return the number of HFiles in this batch
*/
public int getNbHFiles() {
return nbHFiles;
}
/**
* @return total number of operations in this batch
*/
public int getNbOperations() {
return getNbRowKeys() + getNbHFiles();
}
/**
* @return the heap size of this batch
*/
public long getHeapSize() {
return heapSize;
}
/**
* @return the last sequenceid for each region if the table has serial-replication scope
*/
public Map<String, Long> getLastSeqIds() {
return lastSeqIds;
}
public void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
public void incrementNbHFiles(int increment) {
nbHFiles += increment;
}
public void incrementHeapSize(long increment) {
heapSize += increment;
}
public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}
}

View File

@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable; import java.io.Closeable;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
@ -102,16 +99,18 @@ class WALEntryStream implements Closeable {
} }
/** /**
* @return the next WAL entry in this stream * Returns the next WAL entry in this stream but does not advance.
* @throws IOException */
* @throws NoSuchElementException if no more entries in the stream. public Entry peek() throws IOException {
return hasNext() ? currentEntry: null;
}
/**
* Returns the next WAL entry in this stream and advance the stream.
*/ */
public Entry next() throws IOException { public Entry next() throws IOException {
if (!hasNext()) { Entry save = peek();
throw new NoSuchElementException(); currentEntry = null;
}
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
return save; return save;
} }

View File

@ -168,6 +168,14 @@ public class FSTableDescriptors implements TableDescriptors {
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE) .setBloomFilterType(BloomType.NONE)
.build()) .build())
.addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
.setMaxVersions(HConstants.ALL_VERSIONS)
.setInMemory(true)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.build())
.addCoprocessor("org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint", .addCoprocessor("org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint",
null, Coprocessor.PRIORITY_SYSTEM, null); null, Coprocessor.PRIORITY_SYSTEM, null);
} }

View File

@ -18,13 +18,7 @@
*/ */
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.CheckForNull; import edu.umd.cs.findbugs.annotations.CheckForNull;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.EOFException; import java.io.EOFException;
@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HFileLink;
@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
/** /**
* Utility methods for interacting with the underlying file system. * Utility methods for interacting with the underlying file system.
@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils {
return regionDirs; return regionDirs;
} }
public static Path getRegionDir(Path tableDir, RegionInfo region) {
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
}
/** /**
* Filter for all dirs that are legal column family names. This is generally used for colfam * Filter for all dirs that are legal column family names. This is generally used for colfam
* dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;. * dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.

View File

@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey {
this.replicationScope = replicationScope; this.replicationScope = replicationScope;
} }
public void serializeReplicationScope(boolean serialize) { public void clearReplicationScope() {
if (!serialize) {
setReplicationScope(null); setReplicationScope(null);
} }
public boolean hasSerialReplicationScope() {
if (replicationScope == null || replicationScope.isEmpty()) {
return false;
}
return replicationScope.values().stream()
.anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
} }
/** /**

View File

@ -494,7 +494,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent); List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1); assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2); assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@ -535,7 +535,8 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parentA, parentB); List<RegionInfo> regionInfos = Lists.newArrayList(parentA, parentB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3); MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0,
3);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2); assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@ -682,8 +683,8 @@ public class TestMetaTableAccessor {
EnvironmentEdgeManager.injectEdge(edge); EnvironmentEdgeManager.injectEdge(edge);
try { try {
// now merge the regions, effectively deleting the rows for region a and b. // now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn, MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB,
1); -1L, sn, 1);
} finally { } finally {
EnvironmentEdgeManager.reset(); EnvironmentEdgeManager.reset();
} }
@ -776,7 +777,8 @@ public class TestMetaTableAccessor {
} }
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls; long prevCalls = scheduler.numPriorityCalls;
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(),
1);
assertTrue(prevCalls < scheduler.numPriorityCalls); assertTrue(prevCalls < scheduler.numPriorityCalls);
} }
@ -813,7 +815,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent); List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
Get get1 = new Get(splitA.getRegionName()); Get get1 = new Get(splitA.getRegionName());
Result resultA = meta.get(get1); Result resultA = meta.get(get1);
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY, Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,

View File

@ -194,7 +194,7 @@ public class TestHRegionFileSystem {
@Test @Test
public void testOnDiskRegionCreation() throws IOException { public void testOnDiskRegionCreation() throws IOException {
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
@ -226,7 +226,7 @@ public class TestHRegionFileSystem {
@Test @Test
public void testNonIdempotentOpsWithRetries() throws IOException { public void testNonIdempotentOpsWithRetries() throws IOException {
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem(); FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
@ -235,19 +235,15 @@ public class TestHRegionFileSystem {
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri); HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
assertTrue(fs.exists(regionFs.getRegionDir())); assertTrue(fs.exists(regionFs.getRegionDir()));
regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri);
null, null);
// HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir,
// hri);
boolean result = regionFs.createDir(new Path("/foo/bar")); boolean result = regionFs.createDir(new Path("/foo/bar"));
assertTrue("Couldn't create the directory", result); assertTrue("Couldn't create the directory", result);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2")); result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
assertTrue("Couldn't rename the directory", result); assertTrue("Couldn't rename the directory", result);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.deleteDir(new Path("/foo/bar")); result = regionFs.deleteDir(new Path("/foo/bar"));
assertTrue("Couldn't delete the directory", result); assertTrue("Couldn't delete the directory", result);
fs.delete(rootDir, true); fs.delete(rootDir, true);

View File

@ -348,7 +348,7 @@ public class TestRegionServerMetrics {
TEST_UTIL.getAdmin().flush(tableName); TEST_UTIL.getAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute(); metricsRegionServer.getRegionServerWrapper().forceRecompute();
assertGauge("storeCount", TABLES_ON_MASTER? 1: 4); assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5);
assertGauge("storeFileCount", 1); assertGauge("storeFileCount", 1);
} }

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Before; import org.junit.Before;
@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@Category(LargeTests.class) @Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationDroppedTables extends TestReplicationBase { public class TestReplicationDroppedTables extends TestReplicationBase {
@ClassRule @ClassRule
@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
/**
* @throws java.lang.Exception
*/
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs, // Starting and stopping replication can make us miss new logs,

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSerialReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplication.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";
private static byte[] CF = Bytes.toBytes("CF");
private static byte[] CQ = Bytes.toBytes("CQ");
private static FileSystem FS;
private static Path LOG_DIR;
private static WALProvider.Writer WRITER;
public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
private static final UUID PEER_UUID = UUID.randomUUID();
@Override
public UUID getPeerUUID() {
return PEER_UUID;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
WRITER.append(entry);
}
WRITER.sync();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return true;
}
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
notifyStopped();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
UTIL.startMiniCluster(3);
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Rule
public final TestName name = new TestName();
private Path logPath;
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
UTIL.ensureSomeRegionServersAvailable(3);
logPath = new Path(LOG_DIR, name.getMethodName());
WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
// add in disable state, so later when enabling it all sources will start push together.
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
false);
}
@After
public void tearDown() throws IOException {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
if (WRITER != null) {
WRITER.close();
WRITER = null;
}
}
@Test
public void testRegionMove() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(rs.getServerName().getServerName()));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !rs.getRegions(tableName).isEmpty();
}
@Override
public String explainFailure() throws Exception {
return region + " is still not on " + rs;
}
});
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 100; i < 200; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= 200;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
assertTrue(
"Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
entry.getKey().getSequenceId() >= seqId);
count++;
}
assertEquals(200, count);
}
}
}

View File

@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter(); wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false); "1", 0, null, false);
wal.append(hri, wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestSerialReplicationChecker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";
private static ReplicationQueueStorage QUEUE_STORAGE;
private static String WAL_FILE_NAME = "test.wal";
private SerialReplicationChecker checker;
@Rule
public final TestName name = new TestName();
private TableName tableName;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(1);
QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
UTIL.getConfiguration());
QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
WAL_FILE_NAME);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
ReplicationSource source = mock(ReplicationSource.class);
when(source.getPeerId()).thenReturn(PEER_ID);
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
Server server = mock(Server.class);
when(server.getConnection()).thenReturn(UTIL.getConnection());
when(source.getServer()).thenReturn(server);
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
tableName = TableName.valueOf(name.getMethodName());
}
private Entry createEntry(RegionInfo region, long seqId) {
WALKeyImpl key = mock(WALKeyImpl.class);
when(key.getTableName()).thenReturn(tableName);
when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
when(key.getSequenceId()).thenReturn(seqId);
Entry entry = mock(Entry.class);
when(entry.getKey()).thenReturn(key);
return entry;
}
private Cell createCell(RegionInfo region) {
return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
.setType(Type.Put).build();
}
@Test
public void testNoBarrierCanPush() throws IOException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
}
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
Bytes.toBytes(state.name()));
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
}
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);
}
}
private void setState(RegionInfo region, RegionState.State state) throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
Bytes.toBytes(state.name()));
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);
}
}
private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
}
@Test
public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addStateAndBarrier(region, RegionState.State.OPEN, 10);
Cell cell = createCell(region);
// can push since we are in the first range
assertTrue(checker.canPush(createEntry(region, 100), cell));
setState(region, RegionState.State.OPENING);
// can not push since we are in the last range and the state is OPENING
assertFalse(checker.canPush(createEntry(region, 102), cell));
addStateAndBarrier(region, RegionState.State.OPEN, 50);
// can not push since the previous range has not been finished yet
assertFalse(checker.canPush(createEntry(region, 102), cell));
updatePushedSeqId(region, 49);
// can push since the previous range has been finished
assertTrue(checker.canPush(createEntry(region, 102), cell));
setState(region, RegionState.State.OPENING);
assertFalse(checker.canPush(createEntry(region, 104), cell));
}
}

View File

@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -180,15 +180,12 @@ public class TestWALEntryStream {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception // There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext()); assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next(); WAL.Entry entry = entryStream.peek();
assertSame(entry, entryStream.next());
assertNotNull(entry); assertNotNull(entry);
assertFalse(entryStream.hasNext()); assertFalse(entryStream.hasNext());
try { assertNull(entryStream.peek());
entry = entryStream.next(); assertNull(entryStream.next());
fail();
} catch (NoSuchElementException e) {
// expected
}
oldPos = entryStream.getPosition(); oldPos = entryStream.getPosition();
} }
@ -346,10 +343,12 @@ public class TestWALEntryStream {
// start up a batcher // start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer= Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class); ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source); walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek(); Path walPath = walQueue.peek();