From f29bf1d7786fb9fedb3b735f4a8134b61283e1d4 Mon Sep 17 00:00:00 2001
From: zhangduo
Date: Mon, 5 Mar 2018 16:47:03 +0800
Subject: [PATCH] HBASE-20115 Reimplement serial replication based on the new
replication storage layer
---
.../apache/hadoop/hbase/HTableDescriptor.java | 8 +
.../hadoop/hbase/MetaTableAccessor.java | 211 ++++++++++++---
.../hadoop/hbase/client/TableDescriptor.java | 8 +-
.../hbase/client/TableDescriptorBuilder.java | 9 +
.../org/apache/hadoop/hbase/HConstants.java | 12 +
.../hadoop/hbase/master/MasterFileSystem.java | 11 +-
.../master/assignment/AssignmentManager.java | 3 +-
.../master/assignment/RegionStateStore.java | 60 +++--
.../assignment/SplitTableRegionProcedure.java | 4 +-
.../AbstractStateMachineTableProcedure.java | 8 +-
.../hbase/regionserver/HRegionFileSystem.java | 11 +-
.../NamespaceTableCfWALEntryFilter.java | 8 +-
.../replication/ScopeWALEntryFilter.java | 34 ++-
.../RecoveredReplicationSource.java | 5 +
.../RecoveredReplicationSourceShipper.java | 12 +-
.../RecoveredReplicationSourceWALReader.java | 9 +-
.../regionserver/ReplicationSource.java | 8 +
.../ReplicationSourceInterface.java | 7 +
.../ReplicationSourceManager.java | 4 +-
.../ReplicationSourceShipper.java | 17 +-
.../ReplicationSourceWALActionListener.java | 39 ++-
.../ReplicationSourceWALReader.java | 188 +++++--------
.../SerialReplicationChecker.java | 255 ++++++++++++++++++
.../regionserver/WALEntryBatch.java | 138 ++++++++++
.../regionserver/WALEntryStream.java | 29 +-
.../hadoop/hbase/util/FSTableDescriptors.java | 8 +
.../org/apache/hadoop/hbase/util/FSUtils.java | 28 +-
.../apache/hadoop/hbase/wal/WALKeyImpl.java | 12 +-
.../hadoop/hbase/TestMetaTableAccessor.java | 14 +-
.../regionserver/TestHRegionFileSystem.java | 14 +-
.../regionserver/TestRegionServerMetrics.java | 4 +-
.../TestReplicationDroppedTables.java | 8 +-
.../replication/TestSerialReplication.java | 234 ++++++++++++++++
.../TestReplicationSourceManager.java | 2 +-
.../TestSerialReplicationChecker.java | 176 ++++++++++++
.../regionserver/TestWALEntryStream.java | 19 +-
36 files changed, 1279 insertions(+), 338 deletions(-)
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 960b91fa158..ca0cb91a36a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -538,6 +538,14 @@ public class HTableDescriptor implements TableDescriptor, Comparable mutations = new ArrayList<>();
+
+ List replicationParents = new ArrayList<>(2);
+ // Deletes for merging regions
+ mutations.add(makeDeleteFromRegionInfo(regionA, time));
+ if (regionAOpenSeqNum > 0) {
+ mutations.add(makePutForReplicationBarrier(regionA, regionAOpenSeqNum, time));
+ replicationParents.add(regionA);
+ }
+ mutations.add(makeDeleteFromRegionInfo(regionB, time));
+ if (regionBOpenSeqNum > 0) {
+ mutations.add(makePutForReplicationBarrier(regionB, regionBOpenSeqNum, time));
+ replicationParents.add(regionB);
+ }
// Put for parent
Put putOfMerged = makePutFromRegionInfo(mergedRegion, time);
@@ -1552,18 +1576,13 @@ public class MetaTableAccessor {
.setType(Type.Put)
.setValue(RegionInfo.toByteArray(regionB))
.build());
-
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then merged region gets added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
// master tries to assign this offline region. This is followed by re-assignments of the
// merged region from resumed {@link MergeTableRegionsProcedure}
addRegionStateToPut(putOfMerged, RegionState.State.CLOSED);
-
- // Deletes for merging regions
- Delete deleteA = makeDeleteFromRegionInfo(regionA, time);
- Delete deleteB = makeDeleteFromRegionInfo(regionB, time);
-
+ mutations.add(putOfMerged);
// The merged is a new region, openSeqNum = 1 is fine. ServerName may be null
// if crash after merge happened but before we got to here.. means in-memory
// locations of offlined merged, now-closed, regions is lost. Should be ok. We
@@ -1577,26 +1596,30 @@ public class MetaTableAccessor {
for (int i = 1; i < regionReplication; i++) {
addEmptyLocation(putOfMerged, i);
}
-
- byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString()
- + HConstants.DELIMITER);
- multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB);
+ // add parent reference for serial replication
+ if (!replicationParents.isEmpty()) {
+ addReplicationParent(putOfMerged, replicationParents);
+ }
+ byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER);
+ multiMutate(connection, meta, tableRow, mutations);
}
}
/**
- * Splits the region into two in an atomic operation. Offlines the parent
- * region with the information that it is split into two, and also adds
- * the daughter regions. Does not add the location information to the daughter
- * regions since they are not open yet.
+ * Splits the region into two in an atomic operation. Offlines the parent region with the
+ * information that it is split into two, and also adds the daughter regions. Does not add the
+ * location information to the daughter regions since they are not open yet.
* @param connection connection we're using
* @param parent the parent region which is split
+ * @param parentOpenSeqNum the next open sequence id for parent region, used by serial
+ * replication. -1 if not necessary.
* @param splitA Split daughter region A
* @param splitB Split daughter region B
* @param sn the location of the region
*/
- public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA,
- RegionInfo splitB, ServerName sn, int regionReplication) throws IOException {
+ public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum,
+ RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication)
+ throws IOException {
try (Table meta = getMetaHTable(connection)) {
long time = EnvironmentEdgeManager.currentTime();
// Put for parent
@@ -1608,7 +1631,11 @@ public class MetaTableAccessor {
// Puts for daughters
Put putA = makePutFromRegionInfo(splitA, time);
Put putB = makePutFromRegionInfo(splitB, time);
-
+ if (parentOpenSeqNum > 0) {
+ addReplicationBarrier(putParent, parentOpenSeqNum);
+ addReplicationParent(putA, Collections.singletonList(parent));
+ addReplicationParent(putB, Collections.singletonList(parent));
+ }
// Set initial state to CLOSED
// NOTE: If initial state is not set to CLOSED then daughter regions get added with the
// default OFFLINE state. If Master gets restarted after this step, start up sequence of
@@ -1668,20 +1695,15 @@ public class MetaTableAccessor {
}
private static void multiMutate(Connection connection, Table table, byte[] row,
- Mutation... mutations)
- throws IOException {
+ Mutation... mutations) throws IOException {
multiMutate(connection, table, row, Arrays.asList(mutations));
}
/**
* Performs an atomic multi-mutate operation against the given table.
*/
- // Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the below. Need to reveal
- // this facility for CPEP use or at least those CPEPs that are on their way to becoming part of
- // core as is the intent for RSGroup eventually.
- public static void multiMutate(Connection connection, final Table table, byte[] row,
- final List mutations)
- throws IOException {
+ private static void multiMutate(Connection connection, final Table table, byte[] row,
+ final List mutations) throws IOException {
debugLogMutations(mutations);
// TODO: Need rollback!!!!
// TODO: Need Retry!!!
@@ -1782,9 +1804,7 @@ public class MetaTableAccessor {
* @param regionInfo region to be deleted from META
* @throws IOException
*/
- public static void deleteRegion(Connection connection,
- RegionInfo regionInfo)
- throws IOException {
+ public static void deleteRegion(Connection connection, RegionInfo regionInfo) throws IOException {
long time = EnvironmentEdgeManager.currentTime();
Delete delete = new Delete(regionInfo.getRegionName());
delete.addFamily(getCatalogFamily(), time);
@@ -1901,6 +1921,33 @@ public class MetaTableAccessor {
.build());
}
+ private static void addReplicationParent(Put put, List parents) throws IOException {
+ byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica)
+ .map(RegionInfo::getRegionNameAsString).collect(Collectors
+ .collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes));
+ put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
+ .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
+ .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build());
+ }
+
+ private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts)
+ throws IOException {
+ Put put = new Put(regionInfo.getRegionName(), ts);
+ addReplicationBarrier(put, openSeqNum);
+ return put;
+ }
+
+ public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException {
+ put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setRow(put.getRow())
+ .setFamily(HConstants.REPLICATION_BARRIER_FAMILY)
+ .setQualifier(HConstants.SEQNUM_QUALIFIER)
+ .setTimestamp(put.getTimeStamp())
+ .setType(Type.Put)
+ .setValue(Bytes.toBytes(openSeqNum))
+ .build());
+ }
+
private static Put addEmptyLocation(Put p, int replicaId) throws IOException {
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
return p.add(builder.clear()
@@ -1926,6 +1973,92 @@ public class MetaTableAccessor {
.build());
}
+ public static final class ReplicationBarrierResult {
+ private final long[] barriers;
+ private final RegionState.State state;
+ private final List parentRegionNames;
+
+ public ReplicationBarrierResult(long[] barriers, State state, List parentRegionNames) {
+ this.barriers = barriers;
+ this.state = state;
+ this.parentRegionNames = parentRegionNames;
+ }
+
+ public long[] getBarriers() {
+ return barriers;
+ }
+
+ public RegionState.State getState() {
+ return state;
+ }
+
+ public List getParentRegionNames() {
+ return parentRegionNames;
+ }
+ }
+
+ private static long getReplicationBarrier(Cell c) {
+ return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+ }
+
+ private static long[] getReplicationBarriers(Result result) {
+ return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
+ .stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray();
+ }
+
+ private static ReplicationBarrierResult getReplicationBarrierResult(Result result) {
+ long[] barriers = getReplicationBarriers(result);
+ byte[] stateBytes = result.getValue(getCatalogFamily(), getRegionStateColumn());
+ RegionState.State state =
+ stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
+ byte[] parentRegionsBytes =
+ result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER);
+ List parentRegionNames =
+ parentRegionsBytes != null
+ ? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX))
+ .map(Bytes::toBytes).collect(Collectors.toList())
+ : Collections.emptyList();
+ return new ReplicationBarrierResult(barriers, state, parentRegionNames);
+ }
+
+ public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn,
+ TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException {
+ byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
+ byte[] metaStopKey =
+ RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
+ Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
+ .addColumn(getCatalogFamily(), getRegionStateColumn())
+ .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
+ .setCaching(10);
+ try (Table table = getMetaHTable(conn); ResultScanner scanner = table.getScanner(scan)) {
+ for (Result result;;) {
+ result = scanner.next();
+ if (result == null) {
+ return new ReplicationBarrierResult(new long[0], null, Collections.emptyList());
+ }
+ byte[] regionName = result.getRow();
+ // TODO: we may look up a region which has already been split or merged so we need to check
+ // whether the encoded name matches. Need to find a way to quit earlier when there is no
+ // record for the given region, for now it will scan to the end of the table.
+ if (!Bytes.equals(encodedRegionName,
+ Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) {
+ continue;
+ }
+ return getReplicationBarrierResult(result);
+ }
+ }
+ }
+
+ public static long[] getReplicationBarrier(Connection conn, byte[] regionName)
+ throws IOException {
+ try (Table table = getMetaHTable(conn)) {
+ Result result = table.get(new Get(regionName)
+ .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER)
+ .readAllVersions());
+ return getReplicationBarriers(result);
+ }
+ }
+
private static void debugLogMutations(List extends Mutation> mutations) throws IOException {
if (!METALOG.isDebugEnabled()) {
return;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 9456fd4dc47..13ad0e21258 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -244,6 +244,11 @@ public interface TableDescriptor {
*/
boolean hasRegionMemStoreReplication();
+ /**
+ * @return true if there are at least one cf whose replication scope is serial.
+ */
+ boolean hasSerialReplicationScope();
+
/**
* Check if the compaction enable flag of the table is true. If flag is false
* then no minor/major compactions will be done in real.
@@ -292,7 +297,8 @@ public interface TableDescriptor {
boolean hasDisabled = false;
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
- if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
+ if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
+ cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
hasDisabled = true;
} else {
hasEnabled = true;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 02901ac6173..2d6bfaf901b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -1127,6 +1127,15 @@ public class TableDescriptorBuilder {
return families.values().toArray(new ColumnFamilyDescriptor[families.size()]);
}
+ /**
+ * Return true if there are at least one cf whose replication scope is serial.
+ */
+ @Override
+ public boolean hasSerialReplicationScope() {
+ return families.values().stream()
+ .anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
+ }
+
/**
* Returns the configured replicas per region
*/
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ac56ce57505..edf8f9c9135 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -535,6 +535,12 @@ public final class HConstants {
/** The serialized table state qualifier */
public static final byte[] TABLE_STATE_QUALIFIER = Bytes.toBytes("state");
+ /** The replication barrier family as a string*/
+ public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier";
+
+ /** The replication barrier family */
+ public static final byte[] REPLICATION_BARRIER_FAMILY =
+ Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR);
/**
* The meta table version column qualifier.
@@ -675,6 +681,12 @@ public final class HConstants {
*/
public static final int REPLICATION_SCOPE_GLOBAL = 1;
+ /**
+ * Scope tag for serially scoped data
+ * This data will be replicated to all peers by the order of sequence id.
+ */
+ public static final int REPLICATION_SCOPE_SERIAL = 2;
+
/**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
index a37fd4e6506..864be029008 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
@@ -208,7 +208,16 @@ public class MasterFileSystem {
/**
* @return HBase root log dir.
*/
- public Path getWALRootDir() { return this.walRootDir; }
+ public Path getWALRootDir() {
+ return this.walRootDir;
+ }
+
+ /**
+ * @return the directory for a give {@code region}.
+ */
+ public Path getRegionDir(RegionInfo region) {
+ return FSUtils.getRegionDir(FSUtils.getTableDir(getRootDir(), region.getTable()), region);
+ }
/**
* @return HBase temp dir.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
index 754731b02b0..0e47065446e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java
@@ -1571,8 +1571,7 @@ public class AssignmentManager implements ServerListener {
}
public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName,
- final RegionInfo daughterA, final RegionInfo daughterB)
- throws IOException {
+ final RegionInfo daughterA, final RegionInfo daughterB) throws IOException {
// Update hbase:meta. Parent will be marked offline and split up in hbase:meta.
// The parent stays in regionStates until cleared when removed by CatalogJanitor.
// Update its state in regionStates to it shows as offline and split when read
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 1eaa4c6e3d2..c98a2d1a1a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
@@ -36,11 +34,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@@ -163,6 +163,11 @@ public class RegionStateStore {
Preconditions.checkArgument(state == State.OPEN && regionLocation != null,
"Open region should be on a server");
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
+ // only update replication barrier for default replica
+ if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
+ hasSerialReplicationScope(regionInfo.getTable())) {
+ MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
+ }
info.append(", openSeqNum=").append(openSeqNum);
info.append(", regionLocation=").append(regionLocation);
} else if (regionLocation != null && !regionLocation.equals(lastHost)) {
@@ -205,24 +210,41 @@ public class RegionStateStore {
}
}
+ private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException {
+ MasterFileSystem mfs = master.getMasterFileSystem();
+ long maxSeqId =
+ WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region));
+ return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM;
+ }
+
// ============================================================================================
// Update Region Splitting State helpers
// ============================================================================================
- public void splitRegion(final RegionInfo parent, final RegionInfo hriA,
- final RegionInfo hriB, final ServerName serverName) throws IOException {
- final TableDescriptor htd = getTableDescriptor(parent.getTable());
- MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName,
- getRegionReplication(htd));
+ public void splitRegion(RegionInfo parent, RegionInfo hriA, RegionInfo hriB,
+ ServerName serverName) throws IOException {
+ TableDescriptor htd = getTableDescriptor(parent.getTable());
+ long parentOpenSeqNum = HConstants.NO_SEQNUM;
+ if (htd.hasSerialReplicationScope()) {
+ parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
+ }
+ MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
+ serverName, getRegionReplication(htd));
}
// ============================================================================================
// Update Region Merging State helpers
// ============================================================================================
- public void mergeRegions(final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB,
- final ServerName serverName) throws IOException {
- final TableDescriptor htd = getTableDescriptor(parent.getTable());
- MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName,
- getRegionReplication(htd));
+ public void mergeRegions(RegionInfo child, RegionInfo hriA, RegionInfo hriB,
+ ServerName serverName) throws IOException {
+ TableDescriptor htd = getTableDescriptor(child.getTable());
+ long regionAOpenSeqNum = -1L;
+ long regionBOpenSeqNum = -1L;
+ if (htd.hasSerialReplicationScope()) {
+ regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
+ regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
+ }
+ MetaTableAccessor.mergeRegions(master.getConnection(), child, hriA, regionAOpenSeqNum, hriB,
+ regionBOpenSeqNum, serverName, getRegionReplication(htd));
}
// ============================================================================================
@@ -239,11 +261,19 @@ public class RegionStateStore {
// ==========================================================================
// Table Descriptors helpers
// ==========================================================================
- private int getRegionReplication(final TableDescriptor htd) {
- return (htd != null) ? htd.getRegionReplication() : 1;
+ private boolean hasSerialReplicationScope(TableName tableName) throws IOException {
+ return hasSerialReplicationScope(getTableDescriptor(tableName));
}
- private TableDescriptor getTableDescriptor(final TableName tableName) throws IOException {
+ private boolean hasSerialReplicationScope(TableDescriptor htd) {
+ return htd != null ? htd.hasSerialReplicationScope() : false;
+ }
+
+ private int getRegionReplication(TableDescriptor htd) {
+ return htd != null ? htd.getRegionReplication() : 1;
+ }
+
+ private TableDescriptor getTableDescriptor(TableName tableName) throws IOException {
return master.getTableDescriptors().get(tableName);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index 994983f4aeb..341affb10a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -253,7 +253,7 @@ public class SplitTableRegionProcedure
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META);
break;
case SPLIT_TABLE_REGION_UPDATE_META:
- updateMetaForDaughterRegions(env);
+ updateMeta(env);
setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META);
break;
case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META:
@@ -762,7 +762,7 @@ public class SplitTableRegionProcedure
* Add daughter regions to META
* @param env MasterProcedureEnv
*/
- private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException {
+ private void updateMeta(final MasterProcedureEnv env) throws IOException {
env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env),
daughter_1_RI, daughter_2_RI);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
index 60436c21cc2..d29682895f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
@@ -31,15 +30,12 @@ import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionOfflineException;
import org.apache.hadoop.hbase.client.TableState;
-import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -131,9 +127,7 @@ public abstract class AbstractStateMachineTableProcedure
}
protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException {
- MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
- Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName());
- return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+ return env.getMasterServices().getMasterFileSystem().getRegionDir(region);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 37a430991be..9666aa51b23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.regionserver;
import java.io.FileNotFoundException;
@@ -25,6 +23,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
@@ -84,6 +83,7 @@ public class HRegionFileSystem {
private final Configuration conf;
private final Path tableDir;
private final FileSystem fs;
+ private final Path regionDir;
/**
* In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the
@@ -105,9 +105,10 @@ public class HRegionFileSystem {
final RegionInfo regionInfo) {
this.fs = fs;
this.conf = conf;
- this.tableDir = tableDir;
- this.regionInfo = regionInfo;
+ this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null");
+ this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null");
this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo);
+ this.regionDir = FSUtils.getRegionDir(tableDir, regionInfo);
this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number",
DEFAULT_HDFS_CLIENT_RETRIES_NUMBER);
this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries",
@@ -135,7 +136,7 @@ public class HRegionFileSystem {
/** @return {@link Path} to the region directory. */
public Path getRegionDir() {
- return new Path(this.tableDir, this.regionInfoForFs.getEncodedName());
+ return regionDir;
}
// ===========================================================================
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index ad6e5a64f3a..08c9f37d384 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
@@ -47,7 +44,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
@InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
- private static final Logger LOG = LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class);
private final ReplicationPeer peer;
private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 5cde40c2f1d..6a2fbcf429b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -15,17 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.replication;
import java.util.NavigableMap;
-
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
@@ -35,7 +33,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
@InterfaceAudience.Private
public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
- BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
+ private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
@Override
public Entry filter(Entry entry) {
@@ -49,21 +47,21 @@ public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
@Override
public Cell filterCell(Entry entry, Cell cell) {
final NavigableMap scopes = entry.getKey().getReplicationScopes();
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- byte[] fam = CellUtil.cloneFamily(cell);
- if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- cell = bulkLoadFilter.filterCell(cell, new Predicate() {
- @Override
- public boolean apply(byte[] fam) {
- return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
- }
- });
- } else {
- if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
- return null;
+ // The scope will be null or empty if
+ // there's nothing to replicate in that WALEdit
+ byte[] fam = CellUtil.cloneFamily(cell);
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ cell = bulkLoadFilter.filterCell(cell, new Predicate() {
+ @Override
+ public boolean apply(byte[] fam) {
+ return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
}
+ });
+ } else {
+ if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+ return null;
}
+ }
return cell;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 3cae0f2d1f9..d9506c0776e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -194,4 +194,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
public ServerName getServerWALsBelongTo() {
return this.replicationQueueInfo.getDeadRegionServers().get(0);
}
+
+ @Override
+ public boolean isRecovered() {
+ return true;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index 38bbb48030c..9c364979b57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,12 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -127,13 +124,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
return startPosition;
}
- @Override
- protected void updateLogPosition(long lastReadPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
- lastReadPosition, true);
- lastLoggedPosition = lastReadPosition;
- }
-
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
index 0af3f5cacca..114f1390000 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java
@@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader {
+
private static final Logger LOG =
- LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
+ LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class);
public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf,
PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter,
@@ -45,13 +46,11 @@ public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALRea
}
@Override
- protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
- throws InterruptedException {
+ protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
// we're done with queue recovery, shut ourself down
setReaderRunning(false);
// shuts down shipper thread immediately
- entryBatchQueue.put(batch != null ? batch
- : new WALEntryBatch(replicationBatchCountCapacity, currentPath));
+ entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath));
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 73d46521562..3b65b25ea3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -607,4 +607,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
public ServerName getServerWALsBelongTo() {
return server.getServerName();
}
+
+ Server getServer() {
+ return server;
+ }
+
+ ReplicationQueueStorage getQueueStorage() {
+ return queueStorage;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index d7cf9a3e8c1..090b4651f7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -166,4 +166,11 @@ public interface ReplicationSourceInterface {
* @return the server name which all WALs belong to
*/
ServerName getServerWALsBelongTo();
+
+ /**
+ * @return whether this is a replication source for recovery.
+ */
+ default boolean isRecovered() {
+ return false;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index eb9dba2b9c9..06fe9776992 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -480,10 +480,10 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region server
*/
public void logPositionAndCleanOldLogs(Path log, String queueId, long position,
- boolean queueRecovered) {
+ Map lastSeqIds, boolean queueRecovered) {
String fileName = log.getName();
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName,
- position, null));
+ position, lastSeqIds));
cleanOldLogs(fileName, queueId, queueRecovered);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 959f6767eaa..d207d775701 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,13 +19,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -128,7 +127,7 @@ public class ReplicationSourceShipper extends Thread {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
if (lastLoggedPosition != lastReadPosition) {
- updateLogPosition(lastReadPosition);
+ updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to to indicate that we're current
source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(),
@@ -168,13 +167,13 @@ public class ReplicationSourceShipper extends Thread {
}
if (this.lastLoggedPosition != lastReadPosition) {
- //Clean up hfile references
+ // Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
- //Log and clean up WAL logs
- updateLogPosition(lastReadPosition);
+ // Log and clean up WAL logs
+ updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds());
}
source.postShipEdits(entries, currentSize);
@@ -222,9 +221,9 @@ public class ReplicationSourceShipper extends Thread {
}
}
- protected void updateLogPosition(long lastReadPosition) {
+ private void updateLogPosition(long lastReadPosition, Map lastSeqIds) {
source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(),
- lastReadPosition, false);
+ lastReadPosition, lastSeqIds, source.isRecovered());
lastLoggedPosition = lastReadPosition;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index eb126146f3f..95fc6a088a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
@@ -31,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
/**
* Used to receive new wals.
*/
@@ -68,31 +65,25 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
- * @throws IOException If failed to parse the WALEdit
*/
@VisibleForTesting
- static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
- boolean replicationForBulkLoadEnabled =
- ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
- boolean foundOtherEdits = false;
- for (Cell cell : logEdit.getCells()) {
- if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
- foundOtherEdits = true;
- break;
- }
+ static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) {
+ // For bulk load replication we need meta family to know the file we want to replicate.
+ if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
+ return;
}
-
- if (!foundOtherEdits && logEdit.getCells().size() > 0) {
- WALProtos.RegionEventDescriptor maybeEvent =
- WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
- if (maybeEvent != null &&
- (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
- // In serially replication, we use scopes when reading close marker.
- foundOtherEdits = true;
- }
+ WALKeyImpl keyImpl = (WALKeyImpl) logKey;
+ // For serial replication we need to count all the sequence ids even for markers, so here we
+ // always need to retain the replication scopes to let the replication wal reader to know that
+ // we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for
+ // WALEdit.METAFAMILY.
+ if (keyImpl.hasSerialReplicationScope()) {
+ return;
}
- if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
- ((WALKeyImpl) logKey).serializeReplicationScope(false);
+ // For replay, or if all the cells are markers, do not need to store replication scope.
+ if (logEdit.isReplay() ||
+ logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
+ keyImpl.clearReplicationScope();
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 579d20f19c5..fe87aec3175 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
- * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue
- *
+ * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
+ * onto a queue
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -77,6 +77,8 @@ public class ReplicationSourceWALReader extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
+ private final SerialReplicationChecker serialReplicationChecker;
+
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@@ -111,6 +113,7 @@ public class ReplicationSourceWALReader extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+ this.serialReplicationChecker = new SerialReplicationChecker(conf, source);
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -131,15 +134,14 @@ public class ReplicationSourceWALReader extends Thread {
continue;
}
WALEntryBatch batch = readWALEntries(entryStream);
- if (batch != null && batch.getNbEntries() > 0) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(String.format("Read %s WAL entries eligible for replication",
- batch.getNbEntries()));
- }
+ if (batch != null) {
+ // need to propagate the batch even it has no entries since it may carry the last
+ // sequence id information for serial replication.
+ LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
- handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
+ handleEmptyWALEntryBatch(entryStream.getCurrentPath());
}
currentPosition = entryStream.getPosition();
entryStream.reset(); // reuse stream
@@ -160,34 +162,66 @@ public class ReplicationSourceWALReader extends Thread {
}
}
- private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
- WALEntryBatch batch = null;
- while (entryStream.hasNext()) {
- if (batch == null) {
- batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ private WALEntryBatch readWALEntries(WALEntryStream entryStream)
+ throws IOException, InterruptedException {
+ if (!entryStream.hasNext()) {
+ return null;
+ }
+ WALEntryBatch batch =
+ new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
+ do {
+ Entry entry = entryStream.peek();
+ batch.setLastWalPosition(entryStream.getPosition());
+ boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
+ // Used to locate the region record in meta table. In WAL we only have the table name and
+ // encoded region name which can not be mapping to region name without scanning all the
+ // records for a table, so we need a start key, just like what we have done at client side
+ // when locating a region. For the markers, we will use the start key of the region as the row
+ // key for the edit. And we need to do this before filtering since all the cells may be
+ // filtered out, especially that for the markers.
+ Cell firstCellInEdit = null;
+ if (hasSerialReplicationScope) {
+ assert !entry.getEdit().isEmpty() : "should not write empty edits";
+ firstCellInEdit = entry.getEdit().getCells().get(0);
}
- Entry entry = entryStream.next();
entry = filterEntry(entry);
if (entry != null) {
+ if (hasSerialReplicationScope) {
+ if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) {
+ if (batch.getNbEntries() > 0) {
+ // we have something that can push, break
+ break;
+ } else {
+ serialReplicationChecker.waitUntilCanPush(entry, firstCellInEdit);
+ }
+ }
+ // arrive here means we can push the entry, record the last sequence id
+ batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()),
+ entry.getKey().getSequenceId());
+ }
+ // actually remove the entry.
+ entryStream.next();
WALEdit edit = entry.getEdit();
if (edit != null && !edit.isEmpty()) {
long entrySize = getEntrySize(entry);
batch.addEntry(entry);
- updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
+ updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
// Stop if too many entries or too big
- if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
- || batch.getNbEntries() >= replicationBatchCountCapacity) {
+ if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
+ batch.getNbEntries() >= replicationBatchCountCapacity) {
break;
}
}
+ } else {
+ // actually remove the entry.
+ entryStream.next();
}
- }
+ } while (entryStream.hasNext());
return batch;
}
- protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
- throws InterruptedException {
+ protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
Thread.sleep(sleepForRetries);
}
@@ -214,7 +248,7 @@ public class ReplicationSourceWALReader extends Thread {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
if (batchQueueHead != null) {
- return batchQueueHead.lastWalPath;
+ return batchQueueHead.getLastWalPath();
}
// otherwise, we must be currently reading from the head of the log queue
return logQueue.peek();
@@ -253,15 +287,12 @@ public class ReplicationSourceWALReader extends Thread {
return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit);
}
- private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) {
+ private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
WALEdit edit = entry.getEdit();
- if (edit != null && !edit.isEmpty()) {
- batch.incrementHeapSize(entrySize);
- Pair nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
- batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
- batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
- }
- batch.lastWalPosition = entryPosition;
+ batch.incrementHeapSize(entrySize);
+ Pair nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
+ batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
+ batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
}
/**
@@ -355,101 +386,4 @@ public class ReplicationSourceWALReader extends Thread {
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}
-
- /**
- * Holds a batch of WAL entries to replicate, along with some statistics
- *
- */
- static class WALEntryBatch {
- private List walEntries;
- // last WAL that was read
- private Path lastWalPath;
- // position in WAL of last entry in this batch
- private long lastWalPosition = 0;
- // number of distinct row keys in this batch
- private int nbRowKeys = 0;
- // number of HFiles
- private int nbHFiles = 0;
- // heap size of data we need to replicate
- private long heapSize = 0;
-
- /**
- * @param walEntries
- * @param lastWalPath Path of the WAL the last entry in this batch was read from
- * @param lastWalPosition Position in the WAL the last entry in this batch was read from
- */
- WALEntryBatch(int maxNbEntries, Path lastWalPath) {
- this.walEntries = new ArrayList<>(maxNbEntries);
- this.lastWalPath = lastWalPath;
- }
-
- public void addEntry(Entry entry) {
- walEntries.add(entry);
- }
-
- /**
- * @return the WAL Entries.
- */
- public List getWalEntries() {
- return walEntries;
- }
-
- /**
- * @return the path of the last WAL that was read.
- */
- public Path getLastWalPath() {
- return lastWalPath;
- }
-
- /**
- * @return the position in the last WAL that was read.
- */
- public long getLastWalPosition() {
- return lastWalPosition;
- }
-
- public int getNbEntries() {
- return walEntries.size();
- }
-
- /**
- * @return the number of distinct row keys in this batch
- */
- public int getNbRowKeys() {
- return nbRowKeys;
- }
-
- /**
- * @return the number of HFiles in this batch
- */
- public int getNbHFiles() {
- return nbHFiles;
- }
-
- /**
- * @return total number of operations in this batch
- */
- public int getNbOperations() {
- return getNbRowKeys() + getNbHFiles();
- }
-
- /**
- * @return the heap size of this batch
- */
- public long getHeapSize() {
- return heapSize;
- }
-
- private void incrementNbRowKeys(int increment) {
- nbRowKeys += increment;
- }
-
- private void incrementNbHFiles(int increment) {
- nbHFiles += increment;
- }
-
- private void incrementHeapSize(long increment) {
- heapSize += increment;
- }
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
new file mode 100644
index 00000000000..95f3868031d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MetaTableAccessor.ReplicationBarrierResult;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
+
+/**
+ *
+ * Helper class to determine whether we can push a given WAL entry without breaking the replication
+ * order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe.
+ *
+ *
+ * We record all the open sequence number for a region in a special family in meta, which is called
+ * 'barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call [bn,
+ * bn+1) a range, and it is obvious that a region will always be on the same RS within a range.
+ *
+ * When split and merge, we will also record the parent for the generated region(s) in the special
+ * family in meta. And also, we will write an extra 'open sequence number' for the parent region(s),
+ * which is the max sequence id of the region plus one.
+ *
+ *
+ *
+ * For each peer, we record the last pushed sequence id for each region. It is managed by the
+ * replication storage.
+ *
+ *
+ * The algorithm works like this:
+ *
+ * - Locate the sequence id we want to push in the barriers
+ * - If it is before the first barrier, we are safe to push. This usually because we enable serial
+ * replication for this table after we create the table and write data into the table.
+ * - In general, if the previous range is finished, then we are safe to push. The way to determine
+ * whether a range is finish is straight-forward: check whether the last pushed sequence id is equal
+ * to the end barrier of the range minus 1. There are several exceptions:
+ *
+ * - If it is in the first range, we need to check whether there are parent regions. If so, we
+ * need to make sure that the data for parent regions have all been pushed.
+ * - If it is in the last range, we need to check the region state. If state is OPENING, then we
+ * are not safe to push. This is because that, before we call reportRIT to master which update the
+ * open sequence number into meta table, we will write a open region event marker to WAL first, and
+ * its sequence id is greater than the newest open sequence number(which has not been updated to
+ * meta table yet so we do not know). For this scenario, the WAL entry for this open region event
+ * marker actually belongs to the range after the 'last' range, so we are not safe to push it.
+ * Otherwise the last pushed sequence id will be updated to this value and then we think the
+ * previous range has already been finished, but this is not true.
+ * - Notice that the above two exceptions are not conflicts, since the first range can also be the
+ * last range if we only have one range.
+ *
+ *
+ *
+ *
+ *
+ * And for performance reason, we do not want to check meta for every WAL entry, so we introduce two
+ * in memory maps. The idea is simple:
+ *
+ * - If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.
+ * - Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If
+ * the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe
+ * to push.
+ *
+ * And for the last range, we do not have an end barrier, so we use the continuity of sequence id to
+ * determine whether we can push. The rule is:
+ *
+ * - When an entry is able to push, then put its sequence id into the {@code pushed} map.
+ * - Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus
+ * one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of
+ * the WAL entry.
+ *
+ *
+ */
+@InterfaceAudience.Private
+class SerialReplicationChecker {
+
+ public static final String REPLICATION_SERIALLY_WAITING_KEY =
+ "hbase.serial.replication.waiting.ms";
+ public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
+
+ private final String peerId;
+
+ private final ReplicationQueueStorage storage;
+
+ private final Connection conn;
+
+ private final long waitTimeMs;
+
+ private final LoadingCache pushed = CacheBuilder.newBuilder()
+ .expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader() {
+
+ @Override
+ public MutableLong load(String key) throws Exception {
+ return new MutableLong(HConstants.NO_SEQNUM);
+ }
+ });
+
+ // Use guava cache to set ttl for each key
+ private final Cache canPushUnder =
+ CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
+
+ public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
+ this.peerId = source.getPeerId();
+ this.storage = source.getQueueStorage();
+ this.conn = source.getServer().getConnection();
+ this.waitTimeMs =
+ conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
+ }
+
+ private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException {
+ long pushedSeqId;
+ try {
+ pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
+ } catch (ReplicationException e) {
+ throw new IOException(
+ "Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e);
+ }
+ // endBarrier is the open sequence number. When opening a region, the open sequence number will
+ // be set to the old max sequence id plus one, so here we need to minus one.
+ return pushedSeqId >= endBarrier - 1;
+ }
+
+ private boolean isParentFinished(byte[] regionName) throws IOException {
+ long[] barriers = MetaTableAccessor.getReplicationBarrier(conn, regionName);
+ if (barriers.length == 0) {
+ return true;
+ }
+ return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName));
+ }
+
+ // We may write a open region marker to WAL before we write the open sequence number to meta, so
+ // if a region is in OPENING state and we are in the last range, it is not safe to say we can push
+ // even if the previous range is finished.
+ private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) {
+ return index == barrierResult.getBarriers().length &&
+ barrierResult.getState() == RegionState.State.OPENING;
+ }
+
+ private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) {
+ if (barriers.length > index) {
+ canPushUnder.put(encodedNameAsString, barriers[index]);
+ }
+ pushed.getUnchecked(encodedNameAsString).setValue(seqId);
+ }
+
+ private boolean canPush(Entry entry, byte[] row) throws IOException {
+ String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
+ long seqId = entry.getKey().getSequenceId();
+ ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn,
+ entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
+ long[] barriers = barrierResult.getBarriers();
+ int index = Arrays.binarySearch(barriers, seqId);
+ if (index == -1) {
+ // This means we are in the range before the first record openSeqNum, this usually because the
+ // wal is written before we enable serial replication for this table, just return true since
+ // we can not guarantee the order.
+ pushed.getUnchecked(encodedNameAsString).setValue(seqId);
+ return true;
+ }
+ // The sequence id range is left closed and right open, so either we decrease the missed insert
+ // point to make the index start from 0, or increase the hit insert point to make the index
+ // start from 1. Here we choose the latter one.
+ if (index < 0) {
+ index = -index - 1;
+ } else {
+ index++;
+ }
+ if (index == 1) {
+ // we are in the first range, check whether we have parents
+ for (byte[] regionName : barrierResult.getParentRegionNames()) {
+ if (!isParentFinished(regionName)) {
+ return false;
+ }
+ }
+ if (isLastRangeAndOpening(barrierResult, index)) {
+ return false;
+ }
+ recordCanPush(encodedNameAsString, seqId, barriers, 1);
+ return true;
+ }
+ // check whether the previous range is finished
+ if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
+ return false;
+ }
+ if (isLastRangeAndOpening(barrierResult, index)) {
+ return false;
+ }
+ recordCanPush(encodedNameAsString, seqId, barriers, index);
+ return true;
+ }
+
+ public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException {
+ String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
+ long seqId = entry.getKey().getSequenceId();
+ Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
+ if (canReplicateUnderSeqId != null) {
+ if (seqId < canReplicateUnderSeqId.longValue()) {
+ return true;
+ }
+ // we are already beyond the last safe point, remove
+ canPushUnder.invalidate(encodedNameAsString);
+ }
+ // This is for the case where the region is currently opened on us, if the sequence id is
+ // continuous then we are safe to replicate. If there is a breakpoint, then maybe the region
+ // has been moved to another RS and then back, so we need to check the barrier.
+ MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
+ if (seqId == previousPushedSeqId.longValue() + 1) {
+ previousPushedSeqId.increment();
+ return true;
+ }
+ return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
+ }
+
+ public void waitUntilCanPush(Entry entry, Cell firstCellInEdit)
+ throws IOException, InterruptedException {
+ byte[] row = CellUtil.cloneRow(firstCellInEdit);
+ while (!canPush(entry, row)) {
+ Thread.sleep(waitTimeMs);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
new file mode 100644
index 00000000000..31c3ac74c91
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Holds a batch of WAL entries to replicate, along with some statistics
+ */
+@InterfaceAudience.Private
+class WALEntryBatch {
+ private List walEntries;
+ // last WAL that was read
+ private Path lastWalPath;
+ // position in WAL of last entry in this batch
+ private long lastWalPosition = 0;
+ // number of distinct row keys in this batch
+ private int nbRowKeys = 0;
+ // number of HFiles
+ private int nbHFiles = 0;
+ // heap size of data we need to replicate
+ private long heapSize = 0;
+ // save the last sequenceid for each region if the table has serial-replication scope
+ private Map lastSeqIds = new HashMap<>();
+
+ /**
+ * @param lastWalPath Path of the WAL the last entry in this batch was read from
+ */
+ WALEntryBatch(int maxNbEntries, Path lastWalPath) {
+ this.walEntries = new ArrayList<>(maxNbEntries);
+ this.lastWalPath = lastWalPath;
+ }
+
+ public void addEntry(Entry entry) {
+ walEntries.add(entry);
+ }
+
+ /**
+ * @return the WAL Entries.
+ */
+ public List getWalEntries() {
+ return walEntries;
+ }
+
+ /**
+ * @return the path of the last WAL that was read.
+ */
+ public Path getLastWalPath() {
+ return lastWalPath;
+ }
+
+ /**
+ * @return the position in the last WAL that was read.
+ */
+ public long getLastWalPosition() {
+ return lastWalPosition;
+ }
+
+ public void setLastWalPosition(long lastWalPosition) {
+ this.lastWalPosition = lastWalPosition;
+ }
+
+ public int getNbEntries() {
+ return walEntries.size();
+ }
+
+ /**
+ * @return the number of distinct row keys in this batch
+ */
+ public int getNbRowKeys() {
+ return nbRowKeys;
+ }
+
+ /**
+ * @return the number of HFiles in this batch
+ */
+ public int getNbHFiles() {
+ return nbHFiles;
+ }
+
+ /**
+ * @return total number of operations in this batch
+ */
+ public int getNbOperations() {
+ return getNbRowKeys() + getNbHFiles();
+ }
+
+ /**
+ * @return the heap size of this batch
+ */
+ public long getHeapSize() {
+ return heapSize;
+ }
+
+ /**
+ * @return the last sequenceid for each region if the table has serial-replication scope
+ */
+ public Map getLastSeqIds() {
+ return lastSeqIds;
+ }
+
+ public void incrementNbRowKeys(int increment) {
+ nbRowKeys += increment;
+ }
+
+ public void incrementNbHFiles(int increment) {
+ nbHFiles += increment;
+ }
+
+ public void incrementHeapSize(long increment) {
+ heapSize += increment;
+ }
+
+ public void setLastSeqId(String region, long sequenceId) {
+ lastSeqIds.put(region, sequenceId);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index 7c83c0c1fac..bcab9b4b339 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually
@@ -102,16 +99,18 @@ class WALEntryStream implements Closeable {
}
/**
- * @return the next WAL entry in this stream
- * @throws IOException
- * @throws NoSuchElementException if no more entries in the stream.
+ * Returns the next WAL entry in this stream but does not advance.
+ */
+ public Entry peek() throws IOException {
+ return hasNext() ? currentEntry: null;
+ }
+
+ /**
+ * Returns the next WAL entry in this stream and advance the stream.
*/
public Entry next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- Entry save = currentEntry;
- currentEntry = null; // gets reloaded by hasNext()
+ Entry save = peek();
+ currentEntry = null;
return save;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
index a67bca1bd18..85292f831a9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java
@@ -170,6 +170,14 @@ public class FSTableDescriptors implements TableDescriptors {
// Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
.setBloomFilterType(BloomType.NONE)
.build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(HConstants.REPLICATION_BARRIER_FAMILY)
+ .setMaxVersions(HConstants.ALL_VERSIONS)
+ .setInMemory(true)
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+ // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore.
+ .setBloomFilterType(BloomType.NONE)
+ .build())
.setCoprocessor(CoprocessorDescriptorBuilder.newBuilder(
MultiRowMutationEndpoint.class.getName())
.setPriority(Coprocessor.PRIORITY_SYSTEM)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index c0b72aa40ff..b106a316ef8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -18,13 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
-import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
-
import edu.umd.cs.findbugs.annotations.CheckForNull;
-
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
@@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.HFileLink;
@@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
@@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
+import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos;
/**
* Utility methods for interacting with the underlying file system.
@@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils {
return regionDirs;
}
+ public static Path getRegionDir(Path tableDir, RegionInfo region) {
+ return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName());
+ }
+
/**
* Filter for all dirs that are legal column family names. This is generally used for colfam
* dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index c1a77eec199..ac23d1d952a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey {
this.replicationScope = replicationScope;
}
- public void serializeReplicationScope(boolean serialize) {
- if (!serialize) {
- setReplicationScope(null);
+ public void clearReplicationScope() {
+ setReplicationScope(null);
+ }
+
+ public boolean hasSerialReplicationScope() {
+ if (replicationScope == null || replicationScope.isEmpty()) {
+ return false;
}
+ return replicationScope.values().stream()
+ .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index ec932076561..9161e255bbd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -494,7 +494,7 @@ public class TestMetaTableAccessor {
List regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+ MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 1);
assertEmptyMetaLocation(meta, splitA.getRegionName(), 2);
@@ -535,7 +535,8 @@ public class TestMetaTableAccessor {
List regionInfos = Lists.newArrayList(parentA, parentB);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3);
+ MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0,
+ 3);
assertEmptyMetaLocation(meta, merged.getRegionName(), 1);
assertEmptyMetaLocation(meta, merged.getRegionName(), 2);
@@ -682,8 +683,8 @@ public class TestMetaTableAccessor {
EnvironmentEdgeManager.injectEdge(edge);
try {
// now merge the regions, effectively deleting the rows for region a and b.
- MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn,
- 1);
+ MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB,
+ -1L, sn, 1);
} finally {
EnvironmentEdgeManager.reset();
}
@@ -776,7 +777,8 @@ public class TestMetaTableAccessor {
}
SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler();
long prevCalls = scheduler.numPriorityCalls;
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1);
+ MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(),
+ 1);
assertTrue(prevCalls < scheduler.numPriorityCalls);
}
@@ -813,7 +815,7 @@ public class TestMetaTableAccessor {
List regionInfos = Lists.newArrayList(parent);
MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3);
- MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3);
+ MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3);
Get get1 = new Get(splitA.getRegionName());
Result resultA = meta.get(get1);
Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
index a4e8e19f67a..e00f07286d9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java
@@ -194,7 +194,7 @@ public class TestHRegionFileSystem {
@Test
public void testOnDiskRegionCreation() throws IOException {
- Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+ Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
@@ -226,7 +226,7 @@ public class TestHRegionFileSystem {
@Test
public void testNonIdempotentOpsWithRetries() throws IOException {
- Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation");
+ Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName());
FileSystem fs = TEST_UTIL.getTestFileSystem();
Configuration conf = TEST_UTIL.getConfiguration();
@@ -235,19 +235,15 @@ public class TestHRegionFileSystem {
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri);
assertTrue(fs.exists(regionFs.getRegionDir()));
- regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(),
- null, null);
- // HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir,
- // hri);
+ regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri);
boolean result = regionFs.createDir(new Path("/foo/bar"));
assertTrue("Couldn't create the directory", result);
-
- regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+ regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2"));
assertTrue("Couldn't rename the directory", result);
- regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null);
+ regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri);
result = regionFs.deleteDir(new Path("/foo/bar"));
assertTrue("Couldn't delete the directory", result);
fs.delete(rootDir, true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
index 50dffd50bfc..fab6512602d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java
@@ -343,12 +343,12 @@ public class TestRegionServerMetrics {
@Test
public void testStoreCount() throws Exception {
- //Force a hfile.
+ // Force a hfile.
doNPuts(1, false);
TEST_UTIL.getAdmin().flush(tableName);
metricsRegionServer.getRegionServerWrapper().forceRecompute();
- assertGauge("storeCount", TABLES_ON_MASTER? 1: 4);
+ assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5);
assertGauge("storeFileCount", 1);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
index ffa03a2d506..e9e92b8748b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.Before;
@@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Category(LargeTests.class)
+@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationDroppedTables extends TestReplicationBase {
@ClassRule
@@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class);
- /**
- * @throws java.lang.Exception
- */
@Before
public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
new file mode 100644
index 00000000000..1408cf06f76
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestSerialReplication {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSerialReplication.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID = "1";
+
+ private static byte[] CF = Bytes.toBytes("CF");
+
+ private static byte[] CQ = Bytes.toBytes("CQ");
+
+ private static FileSystem FS;
+
+ private static Path LOG_DIR;
+
+ private static WALProvider.Writer WRITER;
+
+ public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
+
+ private static final UUID PEER_UUID = UUID.randomUUID();
+
+ @Override
+ public UUID getPeerUUID() {
+ return PEER_UUID;
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ synchronized (WRITER) {
+ try {
+ for (Entry entry : replicateContext.getEntries()) {
+ WRITER.append(entry);
+ }
+ WRITER.sync(false);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void start() {
+ startAsync();
+ }
+
+ @Override
+ public void stop() {
+ stopAsync();
+ }
+
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ notifyStopped();
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10);
+ UTIL.startMiniCluster(3);
+ LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated");
+ FS = UTIL.getTestFileSystem();
+ FS.mkdirs(LOG_DIR);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Rule
+ public final TestName name = new TestName();
+
+ private Path logPath;
+
+ @Before
+ public void setUp() throws IOException, StreamLacksCapabilityException {
+ UTIL.ensureSomeRegionServersAvailable(3);
+ logPath = new Path(LOG_DIR, name.getMethodName());
+ WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration());
+ // add in disable state, so later when enabling it all sources will start push together.
+ UTIL.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+ false);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ UTIL.getAdmin().removeReplicationPeer(PEER_ID);
+ if (WRITER != null) {
+ WRITER.close();
+ WRITER = null;
+ }
+ }
+
+ @Test
+ public void testRegionMove() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+ UTIL.waitTableAvailable(tableName);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
+ HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
+ UTIL.getAdmin().move(region.getEncodedNameAsBytes(),
+ Bytes.toBytes(rs.getServerName().getServerName()));
+ UTIL.waitFor(30000, new ExplainingPredicate() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return !rs.getRegions(tableName).isEmpty();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return region + " is still not on " + rs;
+ }
+ });
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ for (int i = 100; i < 200; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
+ }
+ }
+ UTIL.getAdmin().enableReplicationPeer(PEER_ID);
+ UTIL.waitFor(30000, new ExplainingPredicate() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) {
+ int count = 0;
+ while (reader.next() != null) {
+ count++;
+ }
+ return count >= 200;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Not enough entries replicated";
+ }
+ });
+ try (WAL.Reader reader =
+ WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
+ long seqId = -1L;
+ int count = 0;
+ for (Entry entry;;) {
+ entry = reader.next();
+ if (entry == null) {
+ break;
+ }
+ assertTrue(
+ "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(),
+ entry.getKey().getSequenceId() >= seqId);
+ count++;
+ }
+ assertEquals(200, count);
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index a53cba37080..6d75fec9fdd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
- "1", 0, false);
+ "1", 0, null, false);
wal.append(hri,
new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
new file mode 100644
index 00000000000..c8387c5c41b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestSerialReplicationChecker {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSerialReplicationChecker.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID = "1";
+
+ private static ReplicationQueueStorage QUEUE_STORAGE;
+
+ private static String WAL_FILE_NAME = "test.wal";
+
+ private SerialReplicationChecker checker;
+
+ @Rule
+ public final TestName name = new TestName();
+
+ private TableName tableName;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster(1);
+ QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(),
+ UTIL.getConfiguration());
+ QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID,
+ WAL_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ ReplicationSource source = mock(ReplicationSource.class);
+ when(source.getPeerId()).thenReturn(PEER_ID);
+ when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+ Server server = mock(Server.class);
+ when(server.getConnection()).thenReturn(UTIL.getConnection());
+ when(source.getServer()).thenReturn(server);
+ checker = new SerialReplicationChecker(UTIL.getConfiguration(), source);
+ tableName = TableName.valueOf(name.getMethodName());
+ }
+
+ private Entry createEntry(RegionInfo region, long seqId) {
+ WALKeyImpl key = mock(WALKeyImpl.class);
+ when(key.getTableName()).thenReturn(tableName);
+ when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes());
+ when(key.getSequenceId()).thenReturn(seqId);
+ Entry entry = mock(Entry.class);
+ when(entry.getKey()).thenReturn(key);
+ return entry;
+ }
+
+ private Cell createCell(RegionInfo region) {
+ return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey())
+ .setType(Type.Put).build();
+ }
+
+ @Test
+ public void testNoBarrierCanPush() throws IOException {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ assertTrue(checker.canPush(createEntry(region, 100), createCell(region)));
+ }
+
+ private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers)
+ throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ for (int i = 0; i < barriers.length; i++) {
+ put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER,
+ put.getTimeStamp() - i, Bytes.toBytes(barriers[i]));
+ }
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private void setState(RegionInfo region, RegionState.State state) throws IOException {
+ Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime());
+ put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) {
+ table.put(put);
+ }
+ }
+
+ private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException {
+ QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(),
+ PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId));
+ }
+
+ @Test
+ public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException {
+ RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build();
+ addStateAndBarrier(region, RegionState.State.OPEN, 10);
+ Cell cell = createCell(region);
+ // can push since we are in the first range
+ assertTrue(checker.canPush(createEntry(region, 100), cell));
+ setState(region, RegionState.State.OPENING);
+ // can not push since we are in the last range and the state is OPENING
+ assertFalse(checker.canPush(createEntry(region, 102), cell));
+ addStateAndBarrier(region, RegionState.State.OPEN, 50);
+ // can not push since the previous range has not been finished yet
+ assertFalse(checker.canPush(createEntry(region, 102), cell));
+ updatePushedSeqId(region, 49);
+ // can push since the previous range has been finished
+ assertTrue(checker.canPush(createEntry(region, 102), cell));
+ setState(region, RegionState.State.OPENING);
+ assertFalse(checker.canPush(createEntry(region, 104), cell));
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 2146e474a21..eb7d5a07537 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.NavigableMap;
-import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
@@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -180,15 +180,12 @@ public class TestWALEntryStream {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
- WAL.Entry entry = entryStream.next();
+ WAL.Entry entry = entryStream.peek();
+ assertSame(entry, entryStream.next());
assertNotNull(entry);
assertFalse(entryStream.hasNext());
- try {
- entry = entryStream.next();
- fail();
- } catch (NoSuchElementException e) {
- // expected
- }
+ assertNull(entryStream.peek());
+ assertNull(entryStream.next());
oldPos = entryStream.getPosition();
}
@@ -346,10 +343,12 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
+ Server mockServer= Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
+ when(source.getServer()).thenReturn(mockServer);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();