HBASE-20115 Reimplement serial replication based on the new replication storage layer

This commit is contained in:
zhangduo 2018-03-05 16:47:03 +08:00
parent 1d11cdb26c
commit f29bf1d778
36 changed files with 1279 additions and 338 deletions

View File

@ -538,6 +538,14 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
.collect(Collectors.toList());
}
/**
* Return true if there are at least one cf whose replication scope is serial.
*/
@Override
public boolean hasSerialReplicationScope() {
return delegatee.hasSerialReplicationScope();
}
/**
* Returns the configured replicas per region
*/

View File

@ -34,6 +34,8 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.client.Connection;
@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@ -137,7 +140,7 @@ public class MetaTableAccessor {
private static final Logger LOG = LoggerFactory.getLogger(MetaTableAccessor.class);
private static final Logger METALOG = LoggerFactory.getLogger("org.apache.hadoop.hbase.META");
static final byte [] META_REGION_PREFIX;
private static final byte[] META_REGION_PREFIX;
static {
// Copy the prefix from FIRST_META_REGIONINFO into META_REGION_PREFIX.
// FIRST_META_REGIONINFO == 'hbase:meta,,1'. META_REGION_PREFIX == 'hbase:meta,'
@ -147,6 +150,11 @@ public class MetaTableAccessor {
META_REGION_PREFIX, 0, len);
}
private static final byte[] REPLICATION_PARENT_QUALIFIER = Bytes.toBytes("parent");
private static final String REPLICATION_PARENT_SEPARATOR = "|";
private static final String REPLICATION_PARENT_SEPARATOR_REGEX = "\\|";
/**
* Lists all of the table regions currently in META.
* Deprecated, keep there until some test use this.
@ -838,7 +846,7 @@ public class MetaTableAccessor {
/**
* Returns the column qualifier for serialized region state
* @return HConstants.TABLE_STATE_QUALIFIER
* @return HConstants.STATE_QUALIFIER
*/
private static byte[] getRegionStateColumn() {
return HConstants.STATE_QUALIFIER;
@ -1266,7 +1274,6 @@ public class MetaTableAccessor {
////////////////////////
// Editing operations //
////////////////////////
/**
* Generates and returns a Put containing the region into for the catalog table
*/
@ -1438,7 +1445,7 @@ public class MetaTableAccessor {
* Adds daughter region infos to hbase:meta row for the specified region. Note that this does not
* add its daughter's as different rows, but adds information about the daughters in the same row
* as the parent. Use
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName,int)}
* {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo RegionInfo of parent region
@ -1464,7 +1471,7 @@ public class MetaTableAccessor {
* Adds a (single) hbase:meta row for the specified new region and its daughters. Note that this
* does not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
* {@link #splitRegion(Connection, RegionInfo, RegionInfo, RegionInfo, ServerName, int)}
* {@link #splitRegion(Connection, RegionInfo, long, RegionInfo, RegionInfo, ServerName, int)}
* if you want to do that.
* @param connection connection we're using
* @param regionInfo region information
@ -1519,20 +1526,37 @@ public class MetaTableAccessor {
}
/**
* Merge the two regions into one in an atomic operation. Deletes the two
* merging regions in hbase:meta and adds the merged region with the information of
* two merging regions.
* Merge the two regions into one in an atomic operation. Deletes the two merging regions in
* hbase:meta and adds the merged region with the information of two merging regions.
* @param connection connection we're using
* @param mergedRegion the merged region
* @param regionA merge parent region A
* @param regionAOpenSeqNum the next open sequence id for region A, used by serial replication. -1
* if not necessary.
* @param regionB merge parent region B
* @param regionBOpenSeqNum the next open sequence id for region B, used by serial replication. -1
* if not necessary.
* @param sn the location of the region
*/
public static void mergeRegions(final Connection connection, RegionInfo mergedRegion,
RegionInfo regionA, RegionInfo regionB, ServerName sn, int regionReplication)
throws IOException {
public static void mergeRegions(Connection connection, RegionInfo mergedRegion,
RegionInfo regionA, long regionAOpenSeqNum, RegionInfo regionB, long regionBOpenSeqNum,
ServerName sn, int regionReplication) throws IOException {
try (Table meta = getMetaHTable(connection)) {
long time = EnvironmentEdgeManager.currentTime();
List<Mutation> mutations = new ArrayList<>();
List<RegionInfo> replicationParents = new ArrayList<>(2);
// Deletes for merging regions
mutations.add(makeDeleteFromRegionInfo(regionA, time));
if (regionAOpenSeqNum > 0) {
mutations.add(makePutForReplicationBarrier(regionA, regionAOpenSeqNum, time));
replicationParents.add(regionA);
}
mutations.add(makeDeleteFromRegionInfo(regionB, time));
if (regionBOpenSeqNum > 0) {
mutations.add(makePutForReplicationBarrier(regionB, regionBOpenSeqNum, time));
replicationParents.add(regionB);
}
// Put for parent
Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
@ -1552,18 +1576,13 @@ public class MetaTableAccessor {
.setType(Type.Put)
.setValue(RegionInfo.toByteArray(regionB))
.build());
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then merged region gets added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign this offline region. This is followed by re-assignments of the
// merged region from resumed {@link MergeTableRegionsProcedure}
addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
// Deletes for merging regions
Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
mutations.add(putOfMerged);
// The merged is a new region, openSeqNum = 1 is fine. ServerName may be null
// if crash after merge happened but before we got to here.. means in-memory
// locations of offlined merged, now-closed, regions is lost. Should be ok. We
@ -1577,26 +1596,30 @@ public class MetaTableAccessor {
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putOfMerged, i);
}
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
+ HConstants.DELIMITER);
multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB);
// add parent reference for serial replication
if (!replicationParents.isEmpty()) {
addReplicationParent(putOfMerged, replicationParents);
}
byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER);
multiMutate(connection, meta, tableRow, mutations);
}
}
/**
* Splits the region into two in an atomic operation. Offlines the parent
* region with the information that it is split into two, and also adds
* the daughter regions. Does not add the location information to the daughter
* regions since they are not open yet.
* Splits the region into two in an atomic operation. Offlines the parent region with the
* information that it is split into two, and also adds the daughter regions. Does not add the
* location information to the daughter regions since they are not open yet.
* @param connection connection we're using
* @param parent the parent region which is split
* @param parentOpenSeqNum the next open sequence id for parent region, used by serial
* replication. -1 if not necessary.
* @param splitA Split daughter region A
* @param splitB Split daughter region B
* @param sn the location of the region
*/
public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA,
RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum,
RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication)
throws IOException {
try (Table meta = getMetaHTable(connection)) {
long time = EnvironmentEdgeManager.currentTime();
// Put for parent
@ -1608,7 +1631,11 @@ public class MetaTableAccessor {
// Puts for daughters
Put putA = makePutFromRegionInfo(splitA, time);
Put putB = makePutFromRegionInfo(splitB, time);
if (parentOpenSeqNum > 0) {
addReplicationBarrier(putParent, parentOpenSeqNum);
addReplicationParent(putA, Collections.singletonList(parent));
addReplicationParent(putB, Collections.singletonList(parent));
}
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then daughter regions get added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
@ -1668,20 +1695,15 @@ public class MetaTableAccessor {
}
private static void multiMutate(Connection connection, Table table, byte[] row,
Mutation... mutations)
throws IOException {
Mutation... mutations) throws IOException {
multiMutate(connection, table, row, Arrays.asList(mutations));
}
/**
* Performs an atomic multi-mutate operation against the given table.
*/
// Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the below. Need to reveal
// this facility for CPEP use or at least those CPEPs that are on their way to becoming part of
// core as is the intent for RSGroup eventually.
public static void multiMutate(Connection connection, final Table table, byte[] row,
final List<Mutation> mutations)
throws IOException {
private static void multiMutate(Connection connection, final Table table, byte[] row,
final List<Mutation> mutations) throws IOException {
debugLogMutations(mutations);
// TODO: Need rollback!!!!
// TODO: Need Retry!!!
@ -1782,9 +1804,7 @@ public class MetaTableAccessor {
* @param regionInfo region to be deleted from META
* @throws IOException
*/
public static void deleteRegion(Connection connection,
RegionInfo regionInfo)
throws IOException {
public static void deleteRegion(Connection connection, RegionInfo regionInfo) throws IOException {
long time = EnvironmentEdgeManager.currentTime();
Delete delete = new Delete(regionInfo.getRegionName());
delete.addFamily(getCatalogFamily(), time);
@ -1901,6 +1921,33 @@ public class MetaTableAccessor {
.build());
}
private static void addReplicationParent(Put put, List<RegionInfo> parents) throws IOException {
byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
.map(RegionInfo::getRegionNameAsString).collect(Collectors
.collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes));
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
.setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
}
private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
throws IOException {
Put put = new Put(regionInfo.getRegionName(), ts);
addReplicationBarrier(put, openSeqNum);
return put;
}
public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException {
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
.setRow(put.getRow())
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
.setQualifier(HConstants.SEQNUM_QUALIFIER)
.setTimestamp(put.getTimeStamp())
.setType(Type.Put)
.setValue(Bytes.toBytes(openSeqNum))
.build());
}
private static Put addEmptyLocation(Put p, int replicaId) throws IOException {
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
return p.add(builder.clear()
@ -1926,6 +1973,92 @@ public class MetaTableAccessor {
.build());
}
public static final class ReplicationBarrierResult {
private final long[] barriers;
private final RegionState.State state;
private final List<byte[]> parentRegionNames;
public ReplicationBarrierResult(long[] barriers, State state, List<byte[]> parentRegionNames) {
this.barriers = barriers;
this.state = state;
this.parentRegionNames = parentRegionNames;
}
public long[] getBarriers() {
return barriers;
}
public RegionState.State getState() {
return state;
}
public List<byte[]> getParentRegionNames() {
return parentRegionNames;
}
}
private static long getReplicationBarrier(Cell c) {
return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
}
private static long[] getReplicationBarriers(Result result) {
return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
.stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
}
private static ReplicationBarrierResult getReplicationBarrierResult(Result result) {
long[] barriers = getReplicationBarriers(result);
byte[] stateBytes = result.getValue(getCatalogFamily(), getRegionStateColumn());
RegionState.State state =
stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
byte[] parentRegionsBytes =
result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
List<byte[]> parentRegionNames =
parentRegionsBytes != null
? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
.map(Bytes::toBytes).collect(Collectors.toList())
: Collections.emptyList();
return new ReplicationBarrierResult(barriers, state, parentRegionNames);
}
public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn,
TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException {
byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
byte[] metaStopKey =
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
.addColumn(getCatalogFamily(), getRegionStateColumn())
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
.setCaching(10);
try (Table table = getMetaHTable(conn); ResultScanner scanner = table.getScanner(scan)) {
for (Result result;;) {
result = scanner.next();
if (result == null) {
return new ReplicationBarrierResult(new long[0], null, Collections.emptyList());
}
byte[] regionName = result.getRow();
// TODO: we may look up a region which has already been split or merged so we need to check
// whether the encoded name matches. Need to find a way to quit earlier when there is no
// record for the given region, for now it will scan to the end of the table.
if (!Bytes.equals(encodedRegionName,
Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) {
continue;
}
return getReplicationBarrierResult(result);
}
}
}
public static long[] getReplicationBarrier(Connection conn, byte[] regionName)
throws IOException {
try (Table table = getMetaHTable(conn)) {
Result result = table.get(new Get(regionName)
.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
.readAllVersions());
return getReplicationBarriers(result);
}
}
private static void debugLogMutations(List<? extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) {
return;

View File

@ -244,6 +244,11 @@ public interface TableDescriptor {
*/
boolean hasRegionMemStoreReplication();
/**
* @return true if there are at least one cf whose replication scope is serial.
*/
boolean hasSerialReplicationScope();
/**
* Check if the compaction enable flag of the table is true. If flag is false
* then no minor/major compactions will be done in real.
@ -292,7 +297,8 @@ public interface TableDescriptor {
boolean hasDisabled = false;
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
hasDisabled = true;
} else {
hasEnabled = true;

View File

@ -1127,6 +1127,15 @@ public class TableDescriptorBuilder {
return families.values().toArray(new ColumnFamilyDescriptor[families.size()]);
}
/**
* Return true if there are at least one cf whose replication scope is serial.
*/
@Override
public boolean hasSerialReplicationScope() {
return families.values().stream()
.anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
}
/**
* Returns the configured replicas per region
*/

View File

@ -535,6 +535,12 @@ public final class HConstants {
/** The serialized table state qualifier */
public static final byte[] TABLE_STATE_QUALIFIER = Bytes.toBytes("state");
/** The replication barrier family as a string*/
public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
/** The replication barrier family */
public static final byte[] REPLICATION_BARRIER_FAMILY =
Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
/**
* The meta table version column qualifier.
@ -675,6 +681,12 @@ public final class HConstants {
*/
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
* Scope tag for serially scoped data
* This data will be replicated to all peers by the order of sequence id.
*/
public static final int REPLICATION_SCOPE_SERIAL = 2;
/**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.

View File

@ -208,7 +208,16 @@ public class MasterFileSystem {
/**
* @return HBase root log dir.
*/
public Path getWALRootDir() { return this.walRootDir; }
public Path getWALRootDir() {
return this.walRootDir;
}
/**
* @return the directory for a give {@code region}.
*/
public Path getRegionDir(RegionInfo region) {
return FSUtils.getRegionDir(FSUtils.getTableDir(getRootDir(), region.getTable()), region);
}
/**
* @return HBase temp dir.

View File

@ -1571,8 +1571,7 @@ public class AssignmentManager implements ServerListener {
}
public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
final RegionInfo daughterA, final RegionInfo daughterB)
throws IOException {
final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
// Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
// The parent stays in regionStates until cleared when removed by CatalogJanitor.
// Update its state in regionStates to it shows as offline and split when read

View File

@ -1,5 +1,4 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
@ -36,11 +34,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@ -163,6 +163,11 @@ public class RegionStateStore {
Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
"Open region should be on a server");
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
// only update replication barrier for default replica
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
hasSerialReplicationScope(regionInfo.getTable())) {
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
}
info.append(", openSeqNum=").append(openSeqNum);
info.append(", regionLocation=").append(regionLocation);
} else if (regionLocation != null && !regionLocation.equals(lastHost)) {
@ -205,24 +210,41 @@ public class RegionStateStore {
}
}
private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
MasterFileSystem mfs = master.getMasterFileSystem();
long maxSeqId =
WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
}
// ============================================================================================
// Update Region Splitting State helpers
// ============================================================================================
public void splitRegion(final RegionInfo parent, final RegionInfo hriA,
final RegionInfo hriB, final ServerName serverName) throws IOException {
final TableDescriptor htd = getTableDescriptor(parent.getTable());
MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName,
getRegionReplication(htd));
public void splitRegion(RegionInfo parent, RegionInfo hriA, RegionInfo hriB,
ServerName serverName) throws IOException {
TableDescriptor htd = getTableDescriptor(parent.getTable());
long parentOpenSeqNum = HConstants.NO_SEQNUM;
if (htd.hasSerialReplicationScope()) {
parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
}
MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
serverName, getRegionReplication(htd));
}
// ============================================================================================
// Update Region Merging State helpers
// ============================================================================================
public void mergeRegions(final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB,
final ServerName serverName) throws IOException {
final TableDescriptor htd = getTableDescriptor(parent.getTable());
MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName,
getRegionReplication(htd));
public void mergeRegions(RegionInfo child, RegionInfo hriA, RegionInfo hriB,
ServerName serverName) throws IOException {
TableDescriptor htd = getTableDescriptor(child.getTable());
long regionAOpenSeqNum = -1L;
long regionBOpenSeqNum = -1L;
if (htd.hasSerialReplicationScope()) {
regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
}
MetaTableAccessor.mergeRegions(master.getConnection(), child, hriA, regionAOpenSeqNum, hriB,
regionBOpenSeqNum, serverName, getRegionReplication(htd));
}
// ============================================================================================
@ -239,11 +261,19 @@ public class RegionStateStore {
// ==========================================================================
// Table Descriptors helpers
// ==========================================================================
private int getRegionReplication(final TableDescriptor htd) {
return (htd != null) ? htd.getRegionReplication() : 1;
private boolean hasSerialReplicationScope(TableName tableName) throws IOException {
return hasSerialReplicationScope(getTableDescriptor(tableName));
}
private TableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
private boolean hasSerialReplicationScope(TableDescriptor htd) {
return htd != null ? htd.hasSerialReplicationScope() : false;
}
private int getRegionReplication(TableDescriptor htd) {
return htd != null ? htd.getRegionReplication() : 1;
}
private TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
return master.getTableDescriptors().get(tableName);
}

View File

@ -253,7 +253,7 @@ public class SplitTableRegionProcedure
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
break;
case SPLIT_TABLE_REGION_UPDATE_META:
updateMetaForDaughterRegions(env);
updateMeta(env);
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META);
break;
case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META:
@ -762,7 +762,7 @@ public class SplitTableRegionProcedure
* Add daughter regions to META
* @param env MasterProcedureEnv
*/
private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException {
private void updateMeta(final MasterProcedureEnv env) throws IOException {
env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env),
daughter_1_RI, daughter_2_RI);
}

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
@ -31,15 +30,12 @@ import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionOfflineException;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
/**
@ -131,9 +127,7 @@ public abstract class AbstractStateMachineTableProcedure<TState>
}
protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException {
MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName());
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
return env.getMasterServices().getMasterFileSystem().getRegionDir(region);
}
/**

View File

@ -1,5 +1,4 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
@ -25,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@ -84,6 +83,7 @@ public class HRegionFileSystem {
private final Configuration conf;
private final Path tableDir;
private final FileSystem fs;
private final Path regionDir;
/**
* In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
@ -105,9 +105,10 @@ public class HRegionFileSystem {
final RegionInfo regionInfo) {
this.fs = fs;
this.conf = conf;
this.tableDir = tableDir;
this.regionInfo = regionInfo;
this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null");
this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null");
this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
this.regionDir = FSUtils.getRegionDir(tableDir, regionInfo);
this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
@ -135,7 +136,7 @@ public class HRegionFileSystem {
/** @return {@link Path} to the region directory. */
public Path getRegionDir() {
return new Path(this.tableDir, this.regionInfoForFs.getEncodedName());
return regionDir;
}
// ===========================================================================

View File

@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
@ -47,7 +44,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
@InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
private static final Logger LOG = LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class);
private final ReplicationPeer peer;
private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();

View File

@ -15,17 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.NavigableMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
@ -35,7 +33,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
@Override
public Entry filter(Entry entry) {
@ -49,21 +47,21 @@ public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
@Override
public Cell filterCell(Entry entry, Cell cell) {
final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
byte[] fam = CellUtil.cloneFamily(cell);
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
@Override
public boolean apply(byte[] fam) {
return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
}
});
} else {
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
return null;
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
byte[] fam = CellUtil.cloneFamily(cell);
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
@Override
public boolean apply(byte[] fam) {
return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
}
});
} else {
if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
return null;
}
}
return cell;
}
}

View File

@ -194,4 +194,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
public ServerName getServerWALsBelongTo() {
return this.replicationQueueInfo.getDeadRegionServers().get(0);
}
@Override
public boolean isRecovered() {
return true;
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,12 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -127,13 +124,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
return startPosition;
}
@Override
protected void updateLogPosition(long lastReadPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, true);
lastLoggedPosition = lastReadPosition;
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);

View File

@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
private static final Logger LOG =
LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
@ -45,13 +46,11 @@ public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALRea
}
@Override
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(batch != null ? batch
: new WALEntryBatch(replicationBatchCountCapacity, currentPath));
entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath));
}
}

View File

@ -607,4 +607,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
public ServerName getServerWALsBelongTo() {
return server.getServerName();
}
Server getServer() {
return server;
}
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
}

View File

@ -166,4 +166,11 @@ public interface ReplicationSourceInterface {
* @return the server name which all WALs belong to
*/
ServerName getServerWALsBelongTo();
/**
* @return whether this is a replication source for recovery.
*/
default boolean isRecovered() {
return false;
}
}

View File

@ -480,10 +480,10 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region server
*/
public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
boolean queueRecovered) {
Map<String, Long> lastSeqIds, boolean queueRecovered) {
String fileName = log.getName();
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
position, null));
position, lastSeqIds));
cleanOldLogs(fileName, queueId, queueRecovered);
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -20,13 +19,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -128,7 +127,7 @@ public class ReplicationSourceShipper extends Thread {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
updateLogPosition(lastReadPosition);
updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@ -168,13 +167,13 @@ public class ReplicationSourceShipper extends Thread {
}
if (this.lastLoggedPosition != lastReadPosition) {
//Clean up hfile references
// Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
//Log and clean up WAL logs
updateLogPosition(lastReadPosition);
// Log and clean up WAL logs
updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
}
source.postShipEdits(entries, currentSize);
@ -222,9 +221,9 @@ public class ReplicationSourceShipper extends Thread {
}
}
protected void updateLogPosition(long lastReadPosition) {
private void updateLogPosition(long lastReadPosition, Map<String, Long> lastSeqIds) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
lastReadPosition, false);
lastReadPosition, lastSeqIds, source.isRecovered());
lastLoggedPosition = lastReadPosition;
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@ -31,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
* Used to receive new wals.
*/
@ -68,31 +65,25 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @throws IOException If failed to parse the WALEdit
*/
@VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
boolean replicationForBulkLoadEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
break;
}
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) {
// For bulk load replication we need meta family to know the file we want to replicate.
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
return;
}
if (!foundOtherEdits && logEdit.getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
if (maybeEvent != null &&
(maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
}
WALKeyImpl keyImpl = (WALKeyImpl) logKey;
// For serial replication we need to count all the sequence ids even for markers, so here we
// always need to retain the replication scopes to let the replication wal reader to know that
// we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for
// WALEdit.METAFAMILY.
if (keyImpl.hasSerialReplicationScope()) {
return;
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
((WALKeyImpl) logKey).serializeReplicationScope(false);
// For replay, or if all the cells are markers, do not need to store replication scope.
if (logEdit.isReplay() ||
logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
keyImpl.clearReplicationScope();
}
}
}

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
*
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
* onto a queue
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -77,6 +77,8 @@ public class ReplicationSourceWALReader extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
private final SerialReplicationChecker serialReplicationChecker;
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@ -111,6 +113,7 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@ -131,15 +134,14 @@ public class ReplicationSourceWALReader extends Thread {
continue;
}
WALEntryBatch batch = readWALEntries(entryStream);
if (batch != null && batch.getNbEntries() > 0) {
if (LOG.isTraceEnabled()) {
LOG.trace(String.format("Read %s WAL entries eligible for replication",
batch.getNbEntries()));
}
if (batch != null) {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
handleEmptyWALEntryBatch(entryStream.getCurrentPath());
}
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
@ -160,34 +162,66 @@ public class ReplicationSourceWALReader extends Thread {
}
}
private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
WALEntryBatch batch = null;
while (entryStream.hasNext()) {
if (batch == null) {
batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
private WALEntryBatch readWALEntries(WALEntryStream entryStream)
throws IOException, InterruptedException {
if (!entryStream.hasNext()) {
return null;
}
WALEntryBatch batch =
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
do {
Entry entry = entryStream.peek();
batch.setLastWalPosition(entryStream.getPosition());
boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
// Used to locate the region record in meta table. In WAL we only have the table name and
// encoded region name which can not be mapping to region name without scanning all the
// records for a table, so we need a start key, just like what we have done at client side
// when locating a region. For the markers, we will use the start key of the region as the row
// key for the edit. And we need to do this before filtering since all the cells may be
// filtered out, especially that for the markers.
Cell firstCellInEdit = null;
if (hasSerialReplicationScope) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
firstCellInEdit = entry.getEdit().getCells().get(0);
}
Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
if (hasSerialReplicationScope) {
if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
if (batch.getNbEntries() > 0) {
// we have something that can push, break
break;
} else {
serialReplicationChecker.waitUntilCanPush(entry, firstCellInEdit);
}
}
// arrive here means we can push the entry, record the last sequence id
batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
entry.getKey().getSequenceId());
}
// actually remove the entry.
entryStream.next();
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
|| batch.getNbEntries() >= replicationBatchCountCapacity) {
if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
} else {
// actually remove the entry.
entryStream.next();
}
}
} while (entryStream.hasNext());
return batch;
}
protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
throws InterruptedException {
protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries);
}
@ -214,7 +248,7 @@ public class ReplicationSourceWALReader extends Thread {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
if (batchQueueHead != null) {
return batchQueueHead.lastWalPath;
return batchQueueHead.getLastWalPath();
}
// otherwise, we must be currently reading from the head of the log queue
return logQueue.peek();
@ -253,15 +287,12 @@ public class ReplicationSourceWALReader extends Thread {
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
}
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
batch.incrementHeapSize(entrySize);
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
}
batch.lastWalPosition = entryPosition;
batch.incrementHeapSize(entrySize);
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
}
/**
@ -355,101 +386,4 @@ public class ReplicationSourceWALReader extends Thread {
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}
/**
* Holds a batch of WAL entries to replicate, along with some statistics
*
*/
static class WALEntryBatch {
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
private long lastWalPosition = 0;
// number of distinct row keys in this batch
private int nbRowKeys = 0;
// number of HFiles
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
/**
* @param walEntries
* @param lastWalPath Path of the WAL the last entry in this batch was read from
* @param lastWalPosition Position in the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
public void addEntry(Entry entry) {
walEntries.add(entry);
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
return walEntries;
}
/**
* @return the path of the last WAL that was read.
*/
public Path getLastWalPath() {
return lastWalPath;
}
/**
* @return the position in the last WAL that was read.
*/
public long getLastWalPosition() {
return lastWalPosition;
}
public int getNbEntries() {
return walEntries.size();
}
/**
* @return the number of distinct row keys in this batch
*/
public int getNbRowKeys() {
return nbRowKeys;
}
/**
* @return the number of HFiles in this batch
*/
public int getNbHFiles() {
return nbHFiles;
}
/**
* @return total number of operations in this batch
*/
public int getNbOperations() {
return getNbRowKeys() + getNbHFiles();
}
/**
* @return the heap size of this batch
*/
public long getHeapSize() {
return heapSize;
}
private void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
private void incrementNbHFiles(int increment) {
nbHFiles += increment;
}
private void incrementHeapSize(long increment) {
heapSize += increment;
}
}
}

View File

@ -0,0 +1,255 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MetaTableAccessor.ReplicationBarrierResult;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* <p>
* Helper class to determine whether we can push a given WAL entry without breaking the replication
* order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe.
* </p>
* <p>
* We record all the open sequence number for a region in a special family in meta, which is called
* 'barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call [bn,
* bn+1) a range, and it is obvious that a region will always be on the same RS within a range.
* <p>
* When split and merge, we will also record the parent for the generated region(s) in the special
* family in meta. And also, we will write an extra 'open sequence number' for the parent region(s),
* which is the max sequence id of the region plus one.
* </p>
* </p>
* <p>
* For each peer, we record the last pushed sequence id for each region. It is managed by the
* replication storage.
* </p>
* <p>
* The algorithm works like this:
* <ol>
* <li>Locate the sequence id we want to push in the barriers</li>
* <li>If it is before the first barrier, we are safe to push. This usually because we enable serial
* replication for this table after we create the table and write data into the table.</li>
* <li>In general, if the previous range is finished, then we are safe to push. The way to determine
* whether a range is finish is straight-forward: check whether the last pushed sequence id is equal
* to the end barrier of the range minus 1. There are several exceptions:
* <ul>
* <li>If it is in the first range, we need to check whether there are parent regions. If so, we
* need to make sure that the data for parent regions have all been pushed.</li>
* <li>If it is in the last range, we need to check the region state. If state is OPENING, then we
* are not safe to push. This is because that, before we call reportRIT to master which update the
* open sequence number into meta table, we will write a open region event marker to WAL first, and
* its sequence id is greater than the newest open sequence number(which has not been updated to
* meta table yet so we do not know). For this scenario, the WAL entry for this open region event
* marker actually belongs to the range after the 'last' range, so we are not safe to push it.
* Otherwise the last pushed sequence id will be updated to this value and then we think the
* previous range has already been finished, but this is not true.</li>
* <li>Notice that the above two exceptions are not conflicts, since the first range can also be the
* last range if we only have one range.</li>
* </ul>
* </li>
* </ol>
* </p>
* <p>
* And for performance reason, we do not want to check meta for every WAL entry, so we introduce two
* in memory maps. The idea is simple:
* <ul>
* <li>If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.</li>
* <li>Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If
* the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe
* to push.</li>
* </ul>
* And for the last range, we do not have an end barrier, so we use the continuity of sequence id to
* determine whether we can push. The rule is:
* <ul>
* <li>When an entry is able to push, then put its sequence id into the {@code pushed} map.</li>
* <li>Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus
* one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of
* the WAL entry.</li>
* </ul>
* </p>
*/
@InterfaceAudience.Private
class SerialReplicationChecker {
public static final String REPLICATION_SERIALLY_WAITING_KEY =
"hbase.serial.replication.waiting.ms";
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
private final String peerId;
private final ReplicationQueueStorage storage;
private final Connection conn;
private final long waitTimeMs;
private final LoadingCache<String, MutableLong> pushed = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, MutableLong>() {
@Override
public MutableLong load(String key) throws Exception {
return new MutableLong(HConstants.NO_SEQNUM);
}
});
// Use guava cache to set ttl for each key
private final Cache<String, Long> canPushUnder =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
this.peerId = source.getPeerId();
this.storage = source.getQueueStorage();
this.conn = source.getServer().getConnection();
this.waitTimeMs =
conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
}
private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException {
long pushedSeqId;
try {
pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
} catch (ReplicationException e) {
throw new IOException(
"Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e);
}
// endBarrier is the open sequence number. When opening a region, the open sequence number will
// be set to the old max sequence id plus one, so here we need to minus one.
return pushedSeqId >= endBarrier - 1;
}
private boolean isParentFinished(byte[] regionName) throws IOException {
long[] barriers = MetaTableAccessor.getReplicationBarrier(conn, regionName);
if (barriers.length == 0) {
return true;
}
return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName));
}
// We may write a open region marker to WAL before we write the open sequence number to meta, so
// if a region is in OPENING state and we are in the last range, it is not safe to say we can push
// even if the previous range is finished.
private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) {
return index == barrierResult.getBarriers().length &&
barrierResult.getState() == RegionState.State.OPENING;
}
private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) {
if (barriers.length > index) {
canPushUnder.put(encodedNameAsString, barriers[index]);
}
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
}
private boolean canPush(Entry entry, byte[] row) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
long[] barriers = barrierResult.getBarriers();
int index = Arrays.binarySearch(barriers, seqId);
if (index == -1) {
// This means we are in the range before the first record openSeqNum, this usually because the
// wal is written before we enable serial replication for this table, just return true since
// we can not guarantee the order.
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
return true;
}
// The sequence id range is left closed and right open, so either we decrease the missed insert
// point to make the index start from 0, or increase the hit insert point to make the index
// start from 1. Here we choose the latter one.
if (index < 0) {
index = -index - 1;
} else {
index++;
}
if (index == 1) {
// we are in the first range, check whether we have parents
for (byte[] regionName : barrierResult.getParentRegionNames()) {
if (!isParentFinished(regionName)) {
return false;
}
}
if (isLastRangeAndOpening(barrierResult, index)) {
return false;
}
recordCanPush(encodedNameAsString, seqId, barriers, 1);
return true;
}
// check whether the previous range is finished
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
return false;
}
if (isLastRangeAndOpening(barrierResult, index)) {
return false;
}
recordCanPush(encodedNameAsString, seqId, barriers, index);
return true;
}
public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
if (canReplicateUnderSeqId != null) {
if (seqId < canReplicateUnderSeqId.longValue()) {
return true;
}
// we are already beyond the last safe point, remove
canPushUnder.invalidate(encodedNameAsString);
}
// This is for the case where the region is currently opened on us, if the sequence id is
// continuous then we are safe to replicate. If there is a breakpoint, then maybe the region
// has been moved to another RS and then back, so we need to check the barrier.
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
if (seqId == previousPushedSeqId.longValue() + 1) {
previousPushedSeqId.increment();
return true;
}
return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
}
public void waitUntilCanPush(Entry entry, Cell firstCellInEdit)
throws IOException, InterruptedException {
byte[] row = CellUtil.cloneRow(firstCellInEdit);
while (!canPush(entry, row)) {
Thread.sleep(waitTimeMs);
}
}
}

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Holds a batch of WAL entries to replicate, along with some statistics
*/
@InterfaceAudience.Private
class WALEntryBatch {
private List<Entry> walEntries;
// last WAL that was read
private Path lastWalPath;
// position in WAL of last entry in this batch
private long lastWalPosition = 0;
// number of distinct row keys in this batch
private int nbRowKeys = 0;
// number of HFiles
private int nbHFiles = 0;
// heap size of data we need to replicate
private long heapSize = 0;
// save the last sequenceid for each region if the table has serial-replication scope
private Map<String, Long> lastSeqIds = new HashMap<>();
/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
*/
WALEntryBatch(int maxNbEntries, Path lastWalPath) {
this.walEntries = new ArrayList<>(maxNbEntries);
this.lastWalPath = lastWalPath;
}
public void addEntry(Entry entry) {
walEntries.add(entry);
}
/**
* @return the WAL Entries.
*/
public List<Entry> getWalEntries() {
return walEntries;
}
/**
* @return the path of the last WAL that was read.
*/
public Path getLastWalPath() {
return lastWalPath;
}
/**
* @return the position in the last WAL that was read.
*/
public long getLastWalPosition() {
return lastWalPosition;
}
public void setLastWalPosition(long lastWalPosition) {
this.lastWalPosition = lastWalPosition;
}
public int getNbEntries() {
return walEntries.size();
}
/**
* @return the number of distinct row keys in this batch
*/
public int getNbRowKeys() {
return nbRowKeys;
}
/**
* @return the number of HFiles in this batch
*/
public int getNbHFiles() {
return nbHFiles;
}
/**
* @return total number of operations in this batch
*/
public int getNbOperations() {
return getNbRowKeys() + getNbHFiles();
}
/**
* @return the heap size of this batch
*/
public long getHeapSize() {
return heapSize;
}
/**
* @return the last sequenceid for each region if the table has serial-replication scope
*/
public Map<String, Long> getLastSeqIds() {
return lastSeqIds;
}
public void incrementNbRowKeys(int increment) {
nbRowKeys += increment;
}
public void incrementNbHFiles(int increment) {
nbHFiles += increment;
}
public void incrementHeapSize(long increment) {
heapSize += increment;
}
public void setLastSeqId(String region, long sequenceId) {
lastSeqIds.put(region, sequenceId);
}
}

View File

@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
@ -102,16 +99,18 @@ class WALEntryStream implements Closeable {
}
/**
* @return the next WAL entry in this stream
* @throws IOException
* @throws NoSuchElementException if no more entries in the stream.
* Returns the next WAL entry in this stream but does not advance.
*/
public Entry peek() throws IOException {
return hasNext() ? currentEntry: null;
}
/**
* Returns the next WAL entry in this stream and advance the stream.
*/
public Entry next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Entry save = currentEntry;
currentEntry = null; // gets reloaded by hasNext()
Entry save = peek();
currentEntry = null;
return save;
}

View File

@ -170,6 +170,14 @@ public class FSTableDescriptors implements TableDescriptors {
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.build())
.setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
.setMaxVersions(HConstants.ALL_VERSIONS)
.setInMemory(true)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.build())
.setCoprocessor(CoprocessorDescriptorBuilder.newBuilder(
MultiRowMutationEndpoint.class.getName())
.setPriority(Coprocessor.PRIORITY_SYSTEM)

View File

@ -18,13 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
import edu.umd.cs.findbugs.annotations.CheckForNull;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
/**
* Utility methods for interacting with the underlying file system.
@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils {
return regionDirs;
}
public static Path getRegionDir(Path tableDir, RegionInfo region) {
return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
}
/**
* Filter for all dirs that are legal column family names. This is generally used for colfam
* dirs &lt;hbase.rootdir&gt;/&lt;tabledir&gt;/&lt;regiondir&gt;/&lt;colfamdir&gt;.

View File

@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey {
this.replicationScope = replicationScope;
}
public void serializeReplicationScope(boolean serialize) {
if (!serialize) {
setReplicationScope(null);
public void clearReplicationScope() {
setReplicationScope(null);
}
public boolean hasSerialReplicationScope() {
if (replicationScope == null || replicationScope.isEmpty()) {
return false;
}
return replicationScope.values().stream()
.anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
}
/**

View File

@ -494,7 +494,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@ -535,7 +535,8 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parentA, parentB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3);
MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0,
3);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@ -682,8 +683,8 @@ public class TestMetaTableAccessor {
EnvironmentEdgeManager.injectEdge(edge);
try {
// now merge the regions, effectively deleting the rows for region a and b.
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn,
1);
MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB,
-1L, sn, 1);
} finally {
EnvironmentEdgeManager.reset();
}
@ -776,7 +777,8 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(),
1);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}
@ -813,7 +815,7 @@ public class TestMetaTableAccessor {
List<RegionInfo> regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
Get get1 = new Get(splitA.getRegionName());
Result resultA = meta.get(get1);
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,

View File

@ -194,7 +194,7 @@ public class TestHRegionFileSystem {
@Test
public void testOnDiskRegionCreation() throws IOException {
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
@ -226,7 +226,7 @@ public class TestHRegionFileSystem {
@Test
public void testNonIdempotentOpsWithRetries() throws IOException {
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
@ -235,19 +235,15 @@ public class TestHRegionFileSystem {
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
assertTrue(fs.exists(regionFs.getRegionDir()));
regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(),
null, null);
// HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir,
// hri);
regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri);
boolean result = regionFs.createDir(new Path("/foo/bar"));
assertTrue("Couldn't create the directory", result);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
assertTrue("Couldn't rename the directory", result);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.deleteDir(new Path("/foo/bar"));
assertTrue("Couldn't delete the directory", result);
fs.delete(rootDir, true);

View File

@ -343,12 +343,12 @@ public class TestRegionServerMetrics {
@Test
public void testStoreCount() throws Exception {
//Force a hfile.
// Force a hfile.
doNPuts(1, false);
TEST_UTIL.getAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
assertGauge("storeCount", TABLES_ON_MASTER? 1: 4);
assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5);
assertGauge("storeFileCount", 1);
}

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Before;
@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(LargeTests.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationDroppedTables extends TestReplicationBase {
@ClassRule
@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
/**
* @throws java.lang.Exception
*/
@Before
public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs,

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSerialReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplication.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";
private static byte[] CF = Bytes.toBytes("CF");
private static byte[] CQ = Bytes.toBytes("CQ");
private static FileSystem FS;
private static Path LOG_DIR;
private static WALProvider.Writer WRITER;
public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
private static final UUID PEER_UUID = UUID.randomUUID();
@Override
public UUID getPeerUUID() {
return PEER_UUID;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
synchronized (WRITER) {
try {
for (Entry entry : replicateContext.getEntries()) {
WRITER.append(entry);
}
WRITER.sync(false);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
return true;
}
@Override
public void start() {
startAsync();
}
@Override
public void stop() {
stopAsync();
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
notifyStopped();
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
UTIL.startMiniCluster(3);
LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
FS = UTIL.getTestFileSystem();
FS.mkdirs(LOG_DIR);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Rule
public final TestName name = new TestName();
private Path logPath;
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
UTIL.ensureSomeRegionServersAvailable(3);
logPath = new Path(LOG_DIR, name.getMethodName());
WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
// add in disable state, so later when enabling it all sources will start push together.
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
.setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
false);
}
@After
public void tearDown() throws IOException {
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
if (WRITER != null) {
WRITER.close();
WRITER = null;
}
}
@Test
public void testRegionMove() throws Exception {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
Bytes.toBytes(rs.getServerName().getServerName()));
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !rs.getRegions(tableName).isEmpty();
}
@Override
public String explainFailure() throws Exception {
return region + " is still not on " + rs;
}
});
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 100; i < 200; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.getAdmin().enableReplicationPeer(PEER_ID);
UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
int count = 0;
while (reader.next() != null) {
count++;
}
return count >= 200;
} catch (IOException e) {
return false;
}
}
@Override
public String explainFailure() throws Exception {
return "Not enough entries replicated";
}
});
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
long seqId = -1L;
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
assertTrue(
"Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
entry.getKey().getSequenceId() >= seqId);
count++;
}
assertEquals(200, count);
}
}
}

View File

@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false);
"1", 0, null, false);
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestSerialReplicationChecker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static String PEER_ID = "1";
private static ReplicationQueueStorage QUEUE_STORAGE;
private static String WAL_FILE_NAME = "test.wal";
private SerialReplicationChecker checker;
@Rule
public final TestName name = new TestName();
private TableName tableName;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(1);
QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
UTIL.getConfiguration());
QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
WAL_FILE_NAME);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
ReplicationSource source = mock(ReplicationSource.class);
when(source.getPeerId()).thenReturn(PEER_ID);
when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
Server server = mock(Server.class);
when(server.getConnection()).thenReturn(UTIL.getConnection());
when(source.getServer()).thenReturn(server);
checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
tableName = TableName.valueOf(name.getMethodName());
}
private Entry createEntry(RegionInfo region, long seqId) {
WALKeyImpl key = mock(WALKeyImpl.class);
when(key.getTableName()).thenReturn(tableName);
when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
when(key.getSequenceId()).thenReturn(seqId);
Entry entry = mock(Entry.class);
when(entry.getKey()).thenReturn(key);
return entry;
}
private Cell createCell(RegionInfo region) {
return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
.setType(Type.Put).build();
}
@Test
public void testNoBarrierCanPush() throws IOException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
}
private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
Bytes.toBytes(state.name()));
for (int i = 0; i < barriers.length; i++) {
put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
}
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);
}
}
private void setState(RegionInfo region, RegionState.State state) throws IOException {
Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
Bytes.toBytes(state.name()));
try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
table.put(put);
}
}
private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
}
@Test
public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
addStateAndBarrier(region, RegionState.State.OPEN, 10);
Cell cell = createCell(region);
// can push since we are in the first range
assertTrue(checker.canPush(createEntry(region, 100), cell));
setState(region, RegionState.State.OPENING);
// can not push since we are in the last range and the state is OPENING
assertFalse(checker.canPush(createEntry(region, 102), cell));
addStateAndBarrier(region, RegionState.State.OPEN, 50);
// can not push since the previous range has not been finished yet
assertFalse(checker.canPush(createEntry(region, 102), cell));
updatePushedSeqId(region, 49);
// can push since the previous range has been finished
assertTrue(checker.canPush(createEntry(region, 102), cell));
setState(region, RegionState.State.OPENING);
assertFalse(checker.canPush(createEntry(region, 104), cell));
}
}

View File

@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -180,15 +180,12 @@ public class TestWALEntryStream {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next();
WAL.Entry entry = entryStream.peek();
assertSame(entry, entryStream.next());
assertNotNull(entry);
assertFalse(entryStream.hasNext());
try {
entry = entryStream.next();
fail();
} catch (NoSuchElementException e) {
// expected
}
assertNull(entryStream.peek());
assertNull(entryStream.next());
oldPos = entryStream.getPosition();
}
@ -346,10 +343,12 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer= Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();