diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 960b91fa158..ca0cb91a36a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -538,6 +538,14 @@ public class HTableDescriptor implements TableDescriptor, Comparable mutations = new ArrayList<>(); + + List replicationParents = new ArrayList<>(2); + // Deletes for merging regions + mutations.add(makeDeleteFromRegionInfo(regionA, time)); + if (regionAOpenSeqNum > 0) { + mutations.add(makePutForReplicationBarrier(regionA, regionAOpenSeqNum, time)); + replicationParents.add(regionA); + } + mutations.add(makeDeleteFromRegionInfo(regionB, time)); + if (regionBOpenSeqNum > 0) { + mutations.add(makePutForReplicationBarrier(regionB, regionBOpenSeqNum, time)); + replicationParents.add(regionB); + } // Put for parent Put putOfMerged = makePutFromRegionInfo(mergedRegion, time); @@ -1552,18 +1576,13 @@ public class MetaTableAccessor { .setType(Type.Put) .setValue(RegionInfo.toByteArray(regionB)) .build()); - // Set initial state to CLOSED // NOTE: If initial state is not set to CLOSED then merged region gets added with the // default OFFLINE state. If Master gets restarted after this step, start up sequence of // master tries to assign this offline region. This is followed by re-assignments of the // merged region from resumed {@link MergeTableRegionsProcedure} addRegionStateToPut(putOfMerged, RegionState.State.CLOSED); - - // Deletes for merging regions - Delete deleteA = makeDeleteFromRegionInfo(regionA, time); - Delete deleteB = makeDeleteFromRegionInfo(regionB, time); - + mutations.add(putOfMerged); // The merged is a new region, openSeqNum = 1 is fine. ServerName may be null // if crash after merge happened but before we got to here.. means in-memory // locations of offlined merged, now-closed, regions is lost. Should be ok. We @@ -1577,26 +1596,30 @@ public class MetaTableAccessor { for (int i = 1; i < regionReplication; i++) { addEmptyLocation(putOfMerged, i); } - - byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() - + HConstants.DELIMITER); - multiMutate(connection, meta, tableRow, putOfMerged, deleteA, deleteB); + // add parent reference for serial replication + if (!replicationParents.isEmpty()) { + addReplicationParent(putOfMerged, replicationParents); + } + byte[] tableRow = Bytes.toBytes(mergedRegion.getRegionNameAsString() + HConstants.DELIMITER); + multiMutate(connection, meta, tableRow, mutations); } } /** - * Splits the region into two in an atomic operation. Offlines the parent - * region with the information that it is split into two, and also adds - * the daughter regions. Does not add the location information to the daughter - * regions since they are not open yet. + * Splits the region into two in an atomic operation. Offlines the parent region with the + * information that it is split into two, and also adds the daughter regions. Does not add the + * location information to the daughter regions since they are not open yet. * @param connection connection we're using * @param parent the parent region which is split + * @param parentOpenSeqNum the next open sequence id for parent region, used by serial + * replication. -1 if not necessary. * @param splitA Split daughter region A * @param splitB Split daughter region B * @param sn the location of the region */ - public static void splitRegion(final Connection connection, RegionInfo parent, RegionInfo splitA, - RegionInfo splitB, ServerName sn, int regionReplication) throws IOException { + public static void splitRegion(Connection connection, RegionInfo parent, long parentOpenSeqNum, + RegionInfo splitA, RegionInfo splitB, ServerName sn, int regionReplication) + throws IOException { try (Table meta = getMetaHTable(connection)) { long time = EnvironmentEdgeManager.currentTime(); // Put for parent @@ -1608,7 +1631,11 @@ public class MetaTableAccessor { // Puts for daughters Put putA = makePutFromRegionInfo(splitA, time); Put putB = makePutFromRegionInfo(splitB, time); - + if (parentOpenSeqNum > 0) { + addReplicationBarrier(putParent, parentOpenSeqNum); + addReplicationParent(putA, Collections.singletonList(parent)); + addReplicationParent(putB, Collections.singletonList(parent)); + } // Set initial state to CLOSED // NOTE: If initial state is not set to CLOSED then daughter regions get added with the // default OFFLINE state. If Master gets restarted after this step, start up sequence of @@ -1668,20 +1695,15 @@ public class MetaTableAccessor { } private static void multiMutate(Connection connection, Table table, byte[] row, - Mutation... mutations) - throws IOException { + Mutation... mutations) throws IOException { multiMutate(connection, table, row, Arrays.asList(mutations)); } /** * Performs an atomic multi-mutate operation against the given table. */ - // Used by the RSGroup Coprocessor Endpoint. It had a copy/paste of the below. Need to reveal - // this facility for CPEP use or at least those CPEPs that are on their way to becoming part of - // core as is the intent for RSGroup eventually. - public static void multiMutate(Connection connection, final Table table, byte[] row, - final List mutations) - throws IOException { + private static void multiMutate(Connection connection, final Table table, byte[] row, + final List mutations) throws IOException { debugLogMutations(mutations); // TODO: Need rollback!!!! // TODO: Need Retry!!! @@ -1782,9 +1804,7 @@ public class MetaTableAccessor { * @param regionInfo region to be deleted from META * @throws IOException */ - public static void deleteRegion(Connection connection, - RegionInfo regionInfo) - throws IOException { + public static void deleteRegion(Connection connection, RegionInfo regionInfo) throws IOException { long time = EnvironmentEdgeManager.currentTime(); Delete delete = new Delete(regionInfo.getRegionName()); delete.addFamily(getCatalogFamily(), time); @@ -1901,6 +1921,33 @@ public class MetaTableAccessor { .build()); } + private static void addReplicationParent(Put put, List parents) throws IOException { + byte[] value = parents.stream().map(RegionReplicaUtil::getRegionInfoForDefaultReplica) + .map(RegionInfo::getRegionNameAsString).collect(Collectors + .collectingAndThen(Collectors.joining(REPLICATION_PARENT_SEPARATOR), Bytes::toBytes)); + put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow()) + .setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER) + .setTimestamp(put.getTimeStamp()).setType(Type.Put).setValue(value).build()); + } + + private static Put makePutForReplicationBarrier(RegionInfo regionInfo, long openSeqNum, long ts) + throws IOException { + Put put = new Put(regionInfo.getRegionName(), ts); + addReplicationBarrier(put, openSeqNum); + return put; + } + + public static void addReplicationBarrier(Put put, long openSeqNum) throws IOException { + put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY) + .setRow(put.getRow()) + .setFamily(HConstants.REPLICATION_BARRIER_FAMILY) + .setQualifier(HConstants.SEQNUM_QUALIFIER) + .setTimestamp(put.getTimeStamp()) + .setType(Type.Put) + .setValue(Bytes.toBytes(openSeqNum)) + .build()); + } + private static Put addEmptyLocation(Put p, int replicaId) throws IOException { CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); return p.add(builder.clear() @@ -1926,6 +1973,92 @@ public class MetaTableAccessor { .build()); } + public static final class ReplicationBarrierResult { + private final long[] barriers; + private final RegionState.State state; + private final List parentRegionNames; + + public ReplicationBarrierResult(long[] barriers, State state, List parentRegionNames) { + this.barriers = barriers; + this.state = state; + this.parentRegionNames = parentRegionNames; + } + + public long[] getBarriers() { + return barriers; + } + + public RegionState.State getState() { + return state; + } + + public List getParentRegionNames() { + return parentRegionNames; + } + } + + private static long getReplicationBarrier(Cell c) { + return Bytes.toLong(c.getValueArray(), c.getValueOffset(), c.getValueLength()); + } + + private static long[] getReplicationBarriers(Result result) { + return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) + .stream().mapToLong(MetaTableAccessor::getReplicationBarrier).sorted().distinct().toArray(); + } + + private static ReplicationBarrierResult getReplicationBarrierResult(Result result) { + long[] barriers = getReplicationBarriers(result); + byte[] stateBytes = result.getValue(getCatalogFamily(), getRegionStateColumn()); + RegionState.State state = + stateBytes != null ? RegionState.State.valueOf(Bytes.toString(stateBytes)) : null; + byte[] parentRegionsBytes = + result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, REPLICATION_PARENT_QUALIFIER); + List parentRegionNames = + parentRegionsBytes != null + ? Stream.of(Bytes.toString(parentRegionsBytes).split(REPLICATION_PARENT_SEPARATOR_REGEX)) + .map(Bytes::toBytes).collect(Collectors.toList()) + : Collections.emptyList(); + return new ReplicationBarrierResult(barriers, state, parentRegionNames); + } + + public static ReplicationBarrierResult getReplicationBarrierResult(Connection conn, + TableName tableName, byte[] row, byte[] encodedRegionName) throws IOException { + byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false); + byte[] metaStopKey = + RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false); + Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey) + .addColumn(getCatalogFamily(), getRegionStateColumn()) + .addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true) + .setCaching(10); + try (Table table = getMetaHTable(conn); ResultScanner scanner = table.getScanner(scan)) { + for (Result result;;) { + result = scanner.next(); + if (result == null) { + return new ReplicationBarrierResult(new long[0], null, Collections.emptyList()); + } + byte[] regionName = result.getRow(); + // TODO: we may look up a region which has already been split or merged so we need to check + // whether the encoded name matches. Need to find a way to quit earlier when there is no + // record for the given region, for now it will scan to the end of the table. + if (!Bytes.equals(encodedRegionName, + Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) { + continue; + } + return getReplicationBarrierResult(result); + } + } + } + + public static long[] getReplicationBarrier(Connection conn, byte[] regionName) + throws IOException { + try (Table table = getMetaHTable(conn)) { + Result result = table.get(new Get(regionName) + .addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER) + .readAllVersions()); + return getReplicationBarriers(result); + } + } + private static void debugLogMutations(List mutations) throws IOException { if (!METALOG.isDebugEnabled()) { return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 9456fd4dc47..13ad0e21258 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -244,6 +244,11 @@ public interface TableDescriptor { */ boolean hasRegionMemStoreReplication(); + /** + * @return true if there are at least one cf whose replication scope is serial. + */ + boolean hasSerialReplicationScope(); + /** * Check if the compaction enable flag of the table is true. If flag is false * then no minor/major compactions will be done in real. @@ -292,7 +297,8 @@ public interface TableDescriptor { boolean hasDisabled = false; for (ColumnFamilyDescriptor cf : getColumnFamilies()) { - if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) { + if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL && + cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { hasDisabled = true; } else { hasEnabled = true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 02901ac6173..2d6bfaf901b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -1127,6 +1127,15 @@ public class TableDescriptorBuilder { return families.values().toArray(new ColumnFamilyDescriptor[families.size()]); } + /** + * Return true if there are at least one cf whose replication scope is serial. + */ + @Override + public boolean hasSerialReplicationScope() { + return families.values().stream() + .anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL); + } + /** * Returns the configured replicas per region */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ac56ce57505..edf8f9c9135 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -535,6 +535,12 @@ public final class HConstants { /** The serialized table state qualifier */ public static final byte[] TABLE_STATE_QUALIFIER = Bytes.toBytes("state"); + /** The replication barrier family as a string*/ + public static final String REPLICATION_BARRIER_FAMILY_STR = "rep_barrier"; + + /** The replication barrier family */ + public static final byte[] REPLICATION_BARRIER_FAMILY = + Bytes.toBytes(REPLICATION_BARRIER_FAMILY_STR); /** * The meta table version column qualifier. @@ -675,6 +681,12 @@ public final class HConstants { */ public static final int REPLICATION_SCOPE_GLOBAL = 1; + /** + * Scope tag for serially scoped data + * This data will be replicated to all peers by the order of sequence id. + */ + public static final int REPLICATION_SCOPE_SERIAL = 2; + /** * Default cluster ID, cannot be used to identify a cluster so a key with * this value means it wasn't meant for replication. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index a37fd4e6506..864be029008 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -208,7 +208,16 @@ public class MasterFileSystem { /** * @return HBase root log dir. */ - public Path getWALRootDir() { return this.walRootDir; } + public Path getWALRootDir() { + return this.walRootDir; + } + + /** + * @return the directory for a give {@code region}. + */ + public Path getRegionDir(RegionInfo region) { + return FSUtils.getRegionDir(FSUtils.getTableDir(getRootDir(), region.getTable()), region); + } /** * @return HBase temp dir. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 754731b02b0..0e47065446e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1571,8 +1571,7 @@ public class AssignmentManager implements ServerListener { } public void markRegionAsSplit(final RegionInfo parent, final ServerName serverName, - final RegionInfo daughterA, final RegionInfo daughterB) - throws IOException { + final RegionInfo daughterA, final RegionInfo daughterB) throws IOException { // Update hbase:meta. Parent will be marked offline and split up in hbase:meta. // The parent stays in regionStates until cleared when removed by CatalogJanitor. // Update its state in regionStates to it shows as offline and split when read diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index 1eaa4c6e3d2..c98a2d1a1a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.assignment; import java.io.IOException; @@ -36,11 +34,13 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -163,6 +163,11 @@ public class RegionStateStore { Preconditions.checkArgument(state == State.OPEN && regionLocation != null, "Open region should be on a server"); MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId); + // only update replication barrier for default replica + if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID && + hasSerialReplicationScope(regionInfo.getTable())) { + MetaTableAccessor.addReplicationBarrier(put, openSeqNum); + } info.append(", openSeqNum=").append(openSeqNum); info.append(", regionLocation=").append(regionLocation); } else if (regionLocation != null && !regionLocation.equals(lastHost)) { @@ -205,24 +210,41 @@ public class RegionStateStore { } } + private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException { + MasterFileSystem mfs = master.getMasterFileSystem(); + long maxSeqId = + WALSplitter.getMaxRegionSequenceId(mfs.getFileSystem(), mfs.getRegionDir(region)); + return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM; + } + // ============================================================================================ // Update Region Splitting State helpers // ============================================================================================ - public void splitRegion(final RegionInfo parent, final RegionInfo hriA, - final RegionInfo hriB, final ServerName serverName) throws IOException { - final TableDescriptor htd = getTableDescriptor(parent.getTable()); - MetaTableAccessor.splitRegion(master.getConnection(), parent, hriA, hriB, serverName, - getRegionReplication(htd)); + public void splitRegion(RegionInfo parent, RegionInfo hriA, RegionInfo hriB, + ServerName serverName) throws IOException { + TableDescriptor htd = getTableDescriptor(parent.getTable()); + long parentOpenSeqNum = HConstants.NO_SEQNUM; + if (htd.hasSerialReplicationScope()) { + parentOpenSeqNum = getOpenSeqNumForParentRegion(parent); + } + MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB, + serverName, getRegionReplication(htd)); } // ============================================================================================ // Update Region Merging State helpers // ============================================================================================ - public void mergeRegions(final RegionInfo parent, final RegionInfo hriA, final RegionInfo hriB, - final ServerName serverName) throws IOException { - final TableDescriptor htd = getTableDescriptor(parent.getTable()); - MetaTableAccessor.mergeRegions(master.getConnection(), parent, hriA, hriB, serverName, - getRegionReplication(htd)); + public void mergeRegions(RegionInfo child, RegionInfo hriA, RegionInfo hriB, + ServerName serverName) throws IOException { + TableDescriptor htd = getTableDescriptor(child.getTable()); + long regionAOpenSeqNum = -1L; + long regionBOpenSeqNum = -1L; + if (htd.hasSerialReplicationScope()) { + regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA); + regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB); + } + MetaTableAccessor.mergeRegions(master.getConnection(), child, hriA, regionAOpenSeqNum, hriB, + regionBOpenSeqNum, serverName, getRegionReplication(htd)); } // ============================================================================================ @@ -239,11 +261,19 @@ public class RegionStateStore { // ========================================================================== // Table Descriptors helpers // ========================================================================== - private int getRegionReplication(final TableDescriptor htd) { - return (htd != null) ? htd.getRegionReplication() : 1; + private boolean hasSerialReplicationScope(TableName tableName) throws IOException { + return hasSerialReplicationScope(getTableDescriptor(tableName)); } - private TableDescriptor getTableDescriptor(final TableName tableName) throws IOException { + private boolean hasSerialReplicationScope(TableDescriptor htd) { + return htd != null ? htd.hasSerialReplicationScope() : false; + } + + private int getRegionReplication(TableDescriptor htd) { + return htd != null ? htd.getRegionReplication() : 1; + } + + private TableDescriptor getTableDescriptor(TableName tableName) throws IOException { return master.getTableDescriptors().get(tableName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index 994983f4aeb..341affb10a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -253,7 +253,7 @@ public class SplitTableRegionProcedure setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_UPDATE_META); break; case SPLIT_TABLE_REGION_UPDATE_META: - updateMetaForDaughterRegions(env); + updateMeta(env); setNextState(SplitTableRegionState.SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META); break; case SPLIT_TABLE_REGION_PRE_OPERATION_AFTER_META: @@ -762,7 +762,7 @@ public class SplitTableRegionProcedure * Add daughter regions to META * @param env MasterProcedureEnv */ - private void updateMetaForDaughterRegions(final MasterProcedureEnv env) throws IOException { + private void updateMeta(final MasterProcedureEnv env) throws IOException { env.getAssignmentManager().markRegionAsSplit(getParentRegion(), getParentRegionServerName(env), daughter_1_RI, daughter_2_RI); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java index 60436c21cc2..d29682895f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractStateMachineTableProcedure.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; @@ -31,15 +30,12 @@ import org.apache.hadoop.hbase.client.DoNotRetryRegionException; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionOfflineException; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.yetus.audience.InterfaceAudience; /** @@ -131,9 +127,7 @@ public abstract class AbstractStateMachineTableProcedure } protected final Path getRegionDir(MasterProcedureEnv env, RegionInfo region) throws IOException { - MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); - Path tableDir = FSUtils.getTableDir(mfs.getRootDir(), getTableName()); - return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); + return env.getMasterServices().getMasterFileSystem().getRegionDir(region); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 37a430991be..9666aa51b23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver; import java.io.FileNotFoundException; @@ -25,6 +23,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.UUID; import org.apache.hadoop.conf.Configuration; @@ -84,6 +83,7 @@ public class HRegionFileSystem { private final Configuration conf; private final Path tableDir; private final FileSystem fs; + private final Path regionDir; /** * In order to handle NN connectivity hiccups, one need to retry non-idempotent operation at the @@ -105,9 +105,10 @@ public class HRegionFileSystem { final RegionInfo regionInfo) { this.fs = fs; this.conf = conf; - this.tableDir = tableDir; - this.regionInfo = regionInfo; + this.tableDir = Objects.requireNonNull(tableDir, "tableDir is null"); + this.regionInfo = Objects.requireNonNull(regionInfo, "regionInfo is null"); this.regionInfoForFs = ServerRegionReplicaUtil.getRegionInfoForFs(regionInfo); + this.regionDir = FSUtils.getRegionDir(tableDir, regionInfo); this.hdfsClientRetriesNumber = conf.getInt("hdfs.client.retries.number", DEFAULT_HDFS_CLIENT_RETRIES_NUMBER); this.baseSleepBeforeRetries = conf.getInt("hdfs.client.sleep.before.retries", @@ -135,7 +136,7 @@ public class HRegionFileSystem { /** @return {@link Path} to the region directory. */ public Path getRegionDir() { - return new Path(this.tableDir, this.regionInfoForFs.getEncodedName()); + return regionDir; } // =========================================================================== diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java index ad6e5a64f3a..08c9f37d384 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java @@ -21,16 +21,13 @@ package org.apache.hadoop.hbase.replication; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; /** * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config, @@ -47,7 +44,6 @@ import org.apache.hadoop.hbase.wal.WAL.Entry; @InterfaceAudience.Private public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter { - private static final Logger LOG = LoggerFactory.getLogger(NamespaceTableCfWALEntryFilter.class); private final ReplicationPeer peer; private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java index 5cde40c2f1d..6a2fbcf429b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java @@ -15,17 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.replication; import java.util.NavigableMap; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; @@ -35,7 +33,7 @@ import org.apache.hbase.thirdparty.com.google.common.base.Predicate; @InterfaceAudience.Private public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { - BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); + private final BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter(); @Override public Entry filter(Entry entry) { @@ -49,21 +47,21 @@ public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter { @Override public Cell filterCell(Entry entry, Cell cell) { final NavigableMap scopes = entry.getKey().getReplicationScopes(); - // The scope will be null or empty if - // there's nothing to replicate in that WALEdit - byte[] fam = CellUtil.cloneFamily(cell); - if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { - cell = bulkLoadFilter.filterCell(cell, new Predicate() { - @Override - public boolean apply(byte[] fam) { - return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL; - } - }); - } else { - if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { - return null; + // The scope will be null or empty if + // there's nothing to replicate in that WALEdit + byte[] fam = CellUtil.cloneFamily(cell); + if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) { + cell = bulkLoadFilter.filterCell(cell, new Predicate() { + @Override + public boolean apply(byte[] fam) { + return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL; } + }); + } else { + if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) { + return null; } + } return cell; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 3cae0f2d1f9..d9506c0776e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -194,4 +194,9 @@ public class RecoveredReplicationSource extends ReplicationSource { public ServerName getServerWALsBelongTo() { return this.replicationQueueInfo.getDeadRegionServers().get(0); } + + @Override + public boolean isRecovered() { + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java index 38bbb48030c..9c364979b57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,12 +19,10 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -127,13 +124,6 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper return startPosition; } - @Override - protected void updateLogPosition(long lastReadPosition) { - source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), - lastReadPosition, true); - lastLoggedPosition = lastReadPosition; - } - private void terminate(String reason, Exception cause) { if (cause == null) { LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java index 0af3f5cacca..114f1390000 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceWALReader.java @@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; @InterfaceAudience.Private @InterfaceStability.Evolving public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALReader { + private static final Logger LOG = - LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class); + LoggerFactory.getLogger(RecoveredReplicationSourceWALReader.class); public RecoveredReplicationSourceWALReader(FileSystem fs, Configuration conf, PriorityBlockingQueue logQueue, long startPosition, WALEntryFilter filter, @@ -45,13 +46,11 @@ public class RecoveredReplicationSourceWALReader extends ReplicationSourceWALRea } @Override - protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) - throws InterruptedException { + protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); // we're done with queue recovery, shut ourself down setReaderRunning(false); // shuts down shipper thread immediately - entryBatchQueue.put(batch != null ? batch - : new WALEntryBatch(replicationBatchCountCapacity, currentPath)); + entryBatchQueue.put(new WALEntryBatch(replicationBatchCountCapacity, currentPath)); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 73d46521562..3b65b25ea3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -607,4 +607,12 @@ public class ReplicationSource implements ReplicationSourceInterface { public ServerName getServerWALsBelongTo() { return server.getServerName(); } + + Server getServer() { + return server; + } + + ReplicationQueueStorage getQueueStorage() { + return queueStorage; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index d7cf9a3e8c1..090b4651f7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -166,4 +166,11 @@ public interface ReplicationSourceInterface { * @return the server name which all WALs belong to */ ServerName getServerWALsBelongTo(); + + /** + * @return whether this is a replication source for recovery. + */ + default boolean isRecovered() { + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index eb9dba2b9c9..06fe9776992 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -480,10 +480,10 @@ public class ReplicationSourceManager implements ReplicationListener { * @param queueRecovered indicates if this queue comes from another region server */ public void logPositionAndCleanOldLogs(Path log, String queueId, long position, - boolean queueRecovered) { + Map lastSeqIds, boolean queueRecovered) { String fileName = log.getName(); abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, - position, null)); + position, lastSeqIds)); cleanOldLogs(fileName, queueId, queueRecovered); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 959f6767eaa..d207d775701 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,13 +19,13 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -128,7 +127,7 @@ public class ReplicationSourceShipper extends Thread { int sleepMultiplier = 0; if (entries.isEmpty()) { if (lastLoggedPosition != lastReadPosition) { - updateLogPosition(lastReadPosition); + updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); // if there was nothing to ship and it's not an error // set "ageOfLastShippedOp" to to indicate that we're current source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), @@ -168,13 +167,13 @@ public class ReplicationSourceShipper extends Thread { } if (this.lastLoggedPosition != lastReadPosition) { - //Clean up hfile references + // Clean up hfile references int size = entries.size(); for (int i = 0; i < size; i++) { cleanUpHFileRefs(entries.get(i).getEdit()); } - //Log and clean up WAL logs - updateLogPosition(lastReadPosition); + // Log and clean up WAL logs + updateLogPosition(lastReadPosition, entryBatch.getLastSeqIds()); } source.postShipEdits(entries, currentSize); @@ -222,9 +221,9 @@ public class ReplicationSourceShipper extends Thread { } } - protected void updateLogPosition(long lastReadPosition) { + private void updateLogPosition(long lastReadPosition, Map lastSeqIds) { source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getQueueId(), - lastReadPosition, false); + lastReadPosition, lastSeqIds, source.isRecovered()); lastLoggedPosition = lastReadPosition; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index eb126146f3f..95fc6a088a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; @@ -31,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; - /** * Used to receive new wals. */ @@ -68,31 +65,25 @@ class ReplicationSourceWALActionListener implements WALActionsListener { * compaction WAL edits and if the scope is local. * @param logKey Key that may get scoped according to its edits * @param logEdit Edits used to lookup the scopes - * @throws IOException If failed to parse the WALEdit */ @VisibleForTesting - static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException { - boolean replicationForBulkLoadEnabled = - ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf); - boolean foundOtherEdits = false; - for (Cell cell : logEdit.getCells()) { - if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - foundOtherEdits = true; - break; - } + static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) { + // For bulk load replication we need meta family to know the file we want to replicate. + if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { + return; } - - if (!foundOtherEdits && logEdit.getCells().size() > 0) { - WALProtos.RegionEventDescriptor maybeEvent = - WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0)); - if (maybeEvent != null && - (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) { - // In serially replication, we use scopes when reading close marker. - foundOtherEdits = true; - } + WALKeyImpl keyImpl = (WALKeyImpl) logKey; + // For serial replication we need to count all the sequence ids even for markers, so here we + // always need to retain the replication scopes to let the replication wal reader to know that + // we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for + // WALEdit.METAFAMILY. + if (keyImpl.hasSerialReplicationScope()) { + return; } - if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) { - ((WALKeyImpl) logKey).serializeReplicationScope(false); + // For replay, or if all the cells are markers, do not need to store replication scope. + if (logEdit.isReplay() || + logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) { + keyImpl.clearReplicationScope(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 579d20f19c5..fe87aec3175 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.EOFException; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -33,6 +32,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; @@ -46,8 +46,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescr import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; /** - * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches onto a queue - * + * Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches + * onto a queue */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -77,6 +77,8 @@ public class ReplicationSourceWALReader extends Thread { private AtomicLong totalBufferUsed; private long totalBufferQuota; + private final SerialReplicationChecker serialReplicationChecker; + /** * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the * entries, and puts them on a batch queue. @@ -111,6 +113,7 @@ public class ReplicationSourceWALReader extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false); this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount); + this.serialReplicationChecker = new SerialReplicationChecker(conf, source); LOG.info("peerClusterZnode=" + source.getQueueId() + ", ReplicationSourceWALReaderThread : " + source.getPeerId() + " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity @@ -131,15 +134,14 @@ public class ReplicationSourceWALReader extends Thread { continue; } WALEntryBatch batch = readWALEntries(entryStream); - if (batch != null && batch.getNbEntries() > 0) { - if (LOG.isTraceEnabled()) { - LOG.trace(String.format("Read %s WAL entries eligible for replication", - batch.getNbEntries())); - } + if (batch != null) { + // need to propagate the batch even it has no entries since it may carry the last + // sequence id information for serial replication. + LOG.trace("Read {} WAL entries eligible for replication", batch.getNbEntries()); entryBatchQueue.put(batch); sleepMultiplier = 1; } else { // got no entries and didn't advance position in WAL - handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath()); + handleEmptyWALEntryBatch(entryStream.getCurrentPath()); } currentPosition = entryStream.getPosition(); entryStream.reset(); // reuse stream @@ -160,34 +162,66 @@ public class ReplicationSourceWALReader extends Thread { } } - private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException { - WALEntryBatch batch = null; - while (entryStream.hasNext()) { - if (batch == null) { - batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + private WALEntryBatch readWALEntries(WALEntryStream entryStream) + throws IOException, InterruptedException { + if (!entryStream.hasNext()) { + return null; + } + WALEntryBatch batch = + new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); + do { + Entry entry = entryStream.peek(); + batch.setLastWalPosition(entryStream.getPosition()); + boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope(); + // Used to locate the region record in meta table. In WAL we only have the table name and + // encoded region name which can not be mapping to region name without scanning all the + // records for a table, so we need a start key, just like what we have done at client side + // when locating a region. For the markers, we will use the start key of the region as the row + // key for the edit. And we need to do this before filtering since all the cells may be + // filtered out, especially that for the markers. + Cell firstCellInEdit = null; + if (hasSerialReplicationScope) { + assert !entry.getEdit().isEmpty() : "should not write empty edits"; + firstCellInEdit = entry.getEdit().getCells().get(0); } - Entry entry = entryStream.next(); entry = filterEntry(entry); if (entry != null) { + if (hasSerialReplicationScope) { + if (!serialReplicationChecker.canPush(entry, firstCellInEdit)) { + if (batch.getNbEntries() > 0) { + // we have something that can push, break + break; + } else { + serialReplicationChecker.waitUntilCanPush(entry, firstCellInEdit); + } + } + // arrive here means we can push the entry, record the last sequence id + batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), + entry.getKey().getSequenceId()); + } + // actually remove the entry. + entryStream.next(); WALEdit edit = entry.getEdit(); if (edit != null && !edit.isEmpty()) { long entrySize = getEntrySize(entry); batch.addEntry(entry); - updateBatchStats(batch, entry, entryStream.getPosition(), entrySize); + updateBatchStats(batch, entry, entrySize); boolean totalBufferTooLarge = acquireBufferQuota(entrySize); // Stop if too many entries or too big - if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity - || batch.getNbEntries() >= replicationBatchCountCapacity) { + if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity || + batch.getNbEntries() >= replicationBatchCountCapacity) { break; } } + } else { + // actually remove the entry. + entryStream.next(); } - } + } while (entryStream.hasNext()); return batch; } - protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath) - throws InterruptedException { + protected void handleEmptyWALEntryBatch(Path currentPath) throws InterruptedException { LOG.trace("Didn't read any new entries from WAL"); Thread.sleep(sleepForRetries); } @@ -214,7 +248,7 @@ public class ReplicationSourceWALReader extends Thread { // if we've read some WAL entries, get the Path we read from WALEntryBatch batchQueueHead = entryBatchQueue.peek(); if (batchQueueHead != null) { - return batchQueueHead.lastWalPath; + return batchQueueHead.getLastWalPath(); } // otherwise, we must be currently reading from the head of the log queue return logQueue.peek(); @@ -253,15 +287,12 @@ public class ReplicationSourceWALReader extends Thread { return edit.heapSize() + calculateTotalSizeOfStoreFiles(edit); } - private void updateBatchStats(WALEntryBatch batch, Entry entry, long entryPosition, long entrySize) { + private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) { WALEdit edit = entry.getEdit(); - if (edit != null && !edit.isEmpty()) { - batch.incrementHeapSize(entrySize); - Pair nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); - batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); - batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); - } - batch.lastWalPosition = entryPosition; + batch.incrementHeapSize(entrySize); + Pair nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit); + batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst()); + batch.incrementNbHFiles(nbRowsAndHFiles.getSecond()); } /** @@ -355,101 +386,4 @@ public class ReplicationSourceWALReader extends Thread { public void setReaderRunning(boolean readerRunning) { this.isReaderRunning = readerRunning; } - - /** - * Holds a batch of WAL entries to replicate, along with some statistics - * - */ - static class WALEntryBatch { - private List walEntries; - // last WAL that was read - private Path lastWalPath; - // position in WAL of last entry in this batch - private long lastWalPosition = 0; - // number of distinct row keys in this batch - private int nbRowKeys = 0; - // number of HFiles - private int nbHFiles = 0; - // heap size of data we need to replicate - private long heapSize = 0; - - /** - * @param walEntries - * @param lastWalPath Path of the WAL the last entry in this batch was read from - * @param lastWalPosition Position in the WAL the last entry in this batch was read from - */ - WALEntryBatch(int maxNbEntries, Path lastWalPath) { - this.walEntries = new ArrayList<>(maxNbEntries); - this.lastWalPath = lastWalPath; - } - - public void addEntry(Entry entry) { - walEntries.add(entry); - } - - /** - * @return the WAL Entries. - */ - public List getWalEntries() { - return walEntries; - } - - /** - * @return the path of the last WAL that was read. - */ - public Path getLastWalPath() { - return lastWalPath; - } - - /** - * @return the position in the last WAL that was read. - */ - public long getLastWalPosition() { - return lastWalPosition; - } - - public int getNbEntries() { - return walEntries.size(); - } - - /** - * @return the number of distinct row keys in this batch - */ - public int getNbRowKeys() { - return nbRowKeys; - } - - /** - * @return the number of HFiles in this batch - */ - public int getNbHFiles() { - return nbHFiles; - } - - /** - * @return total number of operations in this batch - */ - public int getNbOperations() { - return getNbRowKeys() + getNbHFiles(); - } - - /** - * @return the heap size of this batch - */ - public long getHeapSize() { - return heapSize; - } - - private void incrementNbRowKeys(int increment) { - nbRowKeys += increment; - } - - private void incrementNbHFiles(int increment) { - nbHFiles += increment; - } - - private void incrementHeapSize(long increment) { - heapSize += increment; - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java new file mode 100644 index 00000000000..95f3868031d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.MetaTableAccessor.ReplicationBarrierResult; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; + +/** + *

+ * Helper class to determine whether we can push a given WAL entry without breaking the replication + * order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe. + *

+ *

+ * We record all the open sequence number for a region in a special family in meta, which is called + * 'barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call [bn, + * bn+1) a range, and it is obvious that a region will always be on the same RS within a range. + *

+ * When split and merge, we will also record the parent for the generated region(s) in the special + * family in meta. And also, we will write an extra 'open sequence number' for the parent region(s), + * which is the max sequence id of the region plus one. + *

+ *

+ *

+ * For each peer, we record the last pushed sequence id for each region. It is managed by the + * replication storage. + *

+ *

+ * The algorithm works like this: + *

    + *
  1. Locate the sequence id we want to push in the barriers
  2. + *
  3. If it is before the first barrier, we are safe to push. This usually because we enable serial + * replication for this table after we create the table and write data into the table.
  4. + *
  5. In general, if the previous range is finished, then we are safe to push. The way to determine + * whether a range is finish is straight-forward: check whether the last pushed sequence id is equal + * to the end barrier of the range minus 1. There are several exceptions: + *
      + *
    • If it is in the first range, we need to check whether there are parent regions. If so, we + * need to make sure that the data for parent regions have all been pushed.
    • + *
    • If it is in the last range, we need to check the region state. If state is OPENING, then we + * are not safe to push. This is because that, before we call reportRIT to master which update the + * open sequence number into meta table, we will write a open region event marker to WAL first, and + * its sequence id is greater than the newest open sequence number(which has not been updated to + * meta table yet so we do not know). For this scenario, the WAL entry for this open region event + * marker actually belongs to the range after the 'last' range, so we are not safe to push it. + * Otherwise the last pushed sequence id will be updated to this value and then we think the + * previous range has already been finished, but this is not true.
    • + *
    • Notice that the above two exceptions are not conflicts, since the first range can also be the + * last range if we only have one range.
    • + *
    + *
  6. + *
+ *

+ *

+ * And for performance reason, we do not want to check meta for every WAL entry, so we introduce two + * in memory maps. The idea is simple: + *

    + *
  • If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.
  • + *
  • Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If + * the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe + * to push.
  • + *
+ * And for the last range, we do not have an end barrier, so we use the continuity of sequence id to + * determine whether we can push. The rule is: + *
    + *
  • When an entry is able to push, then put its sequence id into the {@code pushed} map.
  • + *
  • Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus + * one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of + * the WAL entry.
  • + *
+ *

+ */ +@InterfaceAudience.Private +class SerialReplicationChecker { + + public static final String REPLICATION_SERIALLY_WAITING_KEY = + "hbase.serial.replication.waiting.ms"; + public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000; + + private final String peerId; + + private final ReplicationQueueStorage storage; + + private final Connection conn; + + private final long waitTimeMs; + + private final LoadingCache pushed = CacheBuilder.newBuilder() + .expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader() { + + @Override + public MutableLong load(String key) throws Exception { + return new MutableLong(HConstants.NO_SEQNUM); + } + }); + + // Use guava cache to set ttl for each key + private final Cache canPushUnder = + CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build(); + + public SerialReplicationChecker(Configuration conf, ReplicationSource source) { + this.peerId = source.getPeerId(); + this.storage = source.getQueueStorage(); + this.conn = source.getServer().getConnection(); + this.waitTimeMs = + conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT); + } + + private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException { + long pushedSeqId; + try { + pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId); + } catch (ReplicationException e) { + throw new IOException( + "Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e); + } + // endBarrier is the open sequence number. When opening a region, the open sequence number will + // be set to the old max sequence id plus one, so here we need to minus one. + return pushedSeqId >= endBarrier - 1; + } + + private boolean isParentFinished(byte[] regionName) throws IOException { + long[] barriers = MetaTableAccessor.getReplicationBarrier(conn, regionName); + if (barriers.length == 0) { + return true; + } + return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName)); + } + + // We may write a open region marker to WAL before we write the open sequence number to meta, so + // if a region is in OPENING state and we are in the last range, it is not safe to say we can push + // even if the previous range is finished. + private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) { + return index == barrierResult.getBarriers().length && + barrierResult.getState() == RegionState.State.OPENING; + } + + private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) { + if (barriers.length > index) { + canPushUnder.put(encodedNameAsString, barriers[index]); + } + pushed.getUnchecked(encodedNameAsString).setValue(seqId); + } + + private boolean canPush(Entry entry, byte[] row) throws IOException { + String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName()); + long seqId = entry.getKey().getSequenceId(); + ReplicationBarrierResult barrierResult = MetaTableAccessor.getReplicationBarrierResult(conn, + entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName()); + long[] barriers = barrierResult.getBarriers(); + int index = Arrays.binarySearch(barriers, seqId); + if (index == -1) { + // This means we are in the range before the first record openSeqNum, this usually because the + // wal is written before we enable serial replication for this table, just return true since + // we can not guarantee the order. + pushed.getUnchecked(encodedNameAsString).setValue(seqId); + return true; + } + // The sequence id range is left closed and right open, so either we decrease the missed insert + // point to make the index start from 0, or increase the hit insert point to make the index + // start from 1. Here we choose the latter one. + if (index < 0) { + index = -index - 1; + } else { + index++; + } + if (index == 1) { + // we are in the first range, check whether we have parents + for (byte[] regionName : barrierResult.getParentRegionNames()) { + if (!isParentFinished(regionName)) { + return false; + } + } + if (isLastRangeAndOpening(barrierResult, index)) { + return false; + } + recordCanPush(encodedNameAsString, seqId, barriers, 1); + return true; + } + // check whether the previous range is finished + if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) { + return false; + } + if (isLastRangeAndOpening(barrierResult, index)) { + return false; + } + recordCanPush(encodedNameAsString, seqId, barriers, index); + return true; + } + + public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException { + String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName()); + long seqId = entry.getKey().getSequenceId(); + Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString); + if (canReplicateUnderSeqId != null) { + if (seqId < canReplicateUnderSeqId.longValue()) { + return true; + } + // we are already beyond the last safe point, remove + canPushUnder.invalidate(encodedNameAsString); + } + // This is for the case where the region is currently opened on us, if the sequence id is + // continuous then we are safe to replicate. If there is a breakpoint, then maybe the region + // has been moved to another RS and then back, so we need to check the barrier. + MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString); + if (seqId == previousPushedSeqId.longValue() + 1) { + previousPushedSeqId.increment(); + return true; + } + return canPush(entry, CellUtil.cloneRow(firstCellInEdit)); + } + + public void waitUntilCanPush(Entry entry, Cell firstCellInEdit) + throws IOException, InterruptedException { + byte[] row = CellUtil.cloneRow(firstCellInEdit); + while (!canPush(entry, row)) { + Thread.sleep(waitTimeMs); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java new file mode 100644 index 00000000000..31c3ac74c91 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Holds a batch of WAL entries to replicate, along with some statistics + */ +@InterfaceAudience.Private +class WALEntryBatch { + private List walEntries; + // last WAL that was read + private Path lastWalPath; + // position in WAL of last entry in this batch + private long lastWalPosition = 0; + // number of distinct row keys in this batch + private int nbRowKeys = 0; + // number of HFiles + private int nbHFiles = 0; + // heap size of data we need to replicate + private long heapSize = 0; + // save the last sequenceid for each region if the table has serial-replication scope + private Map lastSeqIds = new HashMap<>(); + + /** + * @param lastWalPath Path of the WAL the last entry in this batch was read from + */ + WALEntryBatch(int maxNbEntries, Path lastWalPath) { + this.walEntries = new ArrayList<>(maxNbEntries); + this.lastWalPath = lastWalPath; + } + + public void addEntry(Entry entry) { + walEntries.add(entry); + } + + /** + * @return the WAL Entries. + */ + public List getWalEntries() { + return walEntries; + } + + /** + * @return the path of the last WAL that was read. + */ + public Path getLastWalPath() { + return lastWalPath; + } + + /** + * @return the position in the last WAL that was read. + */ + public long getLastWalPosition() { + return lastWalPosition; + } + + public void setLastWalPosition(long lastWalPosition) { + this.lastWalPosition = lastWalPosition; + } + + public int getNbEntries() { + return walEntries.size(); + } + + /** + * @return the number of distinct row keys in this batch + */ + public int getNbRowKeys() { + return nbRowKeys; + } + + /** + * @return the number of HFiles in this batch + */ + public int getNbHFiles() { + return nbHFiles; + } + + /** + * @return total number of operations in this batch + */ + public int getNbOperations() { + return getNbRowKeys() + getNbHFiles(); + } + + /** + * @return the heap size of this batch + */ + public long getHeapSize() { + return heapSize; + } + + /** + * @return the last sequenceid for each region if the table has serial-replication scope + */ + public Map getLastSeqIds() { + return lastSeqIds; + } + + public void incrementNbRowKeys(int increment) { + nbRowKeys += increment; + } + + public void incrementNbHFiles(int increment) { + nbHFiles += increment; + } + + public void incrementHeapSize(long increment) { + heapSize += increment; + } + + public void setLastSeqId(String region, long sequenceId) { + lastSeqIds.put(region, sequenceId); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 7c83c0c1fac..bcab9b4b339 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -21,29 +21,26 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.NoSuchElementException; import java.util.OptionalLong; import java.util.concurrent.PriorityBlockingQueue; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; -import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Streaming access to WAL entries. This class is given a queue of WAL {@link Path}, and continually @@ -102,16 +99,18 @@ class WALEntryStream implements Closeable { } /** - * @return the next WAL entry in this stream - * @throws IOException - * @throws NoSuchElementException if no more entries in the stream. + * Returns the next WAL entry in this stream but does not advance. + */ + public Entry peek() throws IOException { + return hasNext() ? currentEntry: null; + } + + /** + * Returns the next WAL entry in this stream and advance the stream. */ public Entry next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - Entry save = currentEntry; - currentEntry = null; // gets reloaded by hasNext() + Entry save = peek(); + currentEntry = null; return save; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java index a67bca1bd18..85292f831a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSTableDescriptors.java @@ -170,6 +170,14 @@ public class FSTableDescriptors implements TableDescriptors { // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. .setBloomFilterType(BloomType.NONE) .build()) + .setColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(HConstants.REPLICATION_BARRIER_FAMILY) + .setMaxVersions(HConstants.ALL_VERSIONS) + .setInMemory(true) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL) + // Disable blooms for meta. Needs work. Seems to mess w/ getClosestOrBefore. + .setBloomFilterType(BloomType.NONE) + .build()) .setCoprocessor(CoprocessorDescriptorBuilder.newBuilder( MultiRowMutationEndpoint.class.getName()) .setPriority(Coprocessor.PRIORITY_SYSTEM) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index c0b72aa40ff..b106a316ef8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -18,13 +18,7 @@ */ package org.apache.hadoop.hbase.util; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; -import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; - import edu.umd.cs.findbugs.annotations.CheckForNull; - import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.EOFException; @@ -54,7 +48,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; @@ -71,9 +64,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HFileLink; @@ -81,8 +72,6 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.security.AccessDeniedException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSHedgedReadMetrics; @@ -94,6 +83,17 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; +import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; +import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.FSProtos; /** * Utility methods for interacting with the underlying file system. @@ -1028,6 +1028,10 @@ public abstract class FSUtils extends CommonFSUtils { return regionDirs; } + public static Path getRegionDir(Path tableDir, RegionInfo region) { + return new Path(tableDir, ServerRegionReplicaUtil.getRegionInfoForFs(region).getEncodedName()); + } + /** * Filter for all dirs that are legal column family names. This is generally used for colfam * dirs <hbase.rootdir>/<tabledir>/<regiondir>/<colfamdir>. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java index c1a77eec199..ac23d1d952a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java @@ -415,10 +415,16 @@ public class WALKeyImpl implements WALKey { this.replicationScope = replicationScope; } - public void serializeReplicationScope(boolean serialize) { - if (!serialize) { - setReplicationScope(null); + public void clearReplicationScope() { + setReplicationScope(null); + } + + public boolean hasSerialReplicationScope() { + if (replicationScope == null || replicationScope.isEmpty()) { + return false; } + return replicationScope.values().stream() + .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java index ec932076561..9161e255bbd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java @@ -494,7 +494,7 @@ public class TestMetaTableAccessor { List regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); + MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3); assertEmptyMetaLocation(meta, splitA.getRegionName(), 1); assertEmptyMetaLocation(meta, splitA.getRegionName(), 2); @@ -535,7 +535,8 @@ public class TestMetaTableAccessor { List regionInfos = Lists.newArrayList(parentA, parentB); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.mergeRegions(connection, merged, parentA, parentB, serverName0, 3); + MetaTableAccessor.mergeRegions(connection, merged, parentA, -1L, parentB, -1L, serverName0, + 3); assertEmptyMetaLocation(meta, merged.getRegionName(), 1); assertEmptyMetaLocation(meta, merged.getRegionName(), 2); @@ -682,8 +683,8 @@ public class TestMetaTableAccessor { EnvironmentEdgeManager.injectEdge(edge); try { // now merge the regions, effectively deleting the rows for region a and b. - MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, regionInfoB, sn, - 1); + MetaTableAccessor.mergeRegions(connection, mergedRegionInfo, regionInfoA, -1L, regionInfoB, + -1L, sn, 1); } finally { EnvironmentEdgeManager.reset(); } @@ -776,7 +777,8 @@ public class TestMetaTableAccessor { } SpyingRpcScheduler scheduler = (SpyingRpcScheduler) rs.getRpcServer().getScheduler(); long prevCalls = scheduler.numPriorityCalls; - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, loc.getServerName(), 1); + MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, loc.getServerName(), + 1); assertTrue(prevCalls < scheduler.numPriorityCalls); } @@ -813,7 +815,7 @@ public class TestMetaTableAccessor { List regionInfos = Lists.newArrayList(parent); MetaTableAccessor.addRegionsToMeta(connection, regionInfos, 3); - MetaTableAccessor.splitRegion(connection, parent, splitA, splitB, serverName0, 3); + MetaTableAccessor.splitRegion(connection, parent, -1L, splitA, splitB, serverName0, 3); Get get1 = new Get(splitA.getRegionName()); Result resultA = meta.get(get1); Cell serverCellA = resultA.getColumnLatestCell(HConstants.CATALOG_FAMILY, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java index a4e8e19f67a..e00f07286d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionFileSystem.java @@ -194,7 +194,7 @@ public class TestHRegionFileSystem { @Test public void testOnDiskRegionCreation() throws IOException { - Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName()); FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); @@ -226,7 +226,7 @@ public class TestHRegionFileSystem { @Test public void testNonIdempotentOpsWithRetries() throws IOException { - Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("testOnDiskRegionCreation"); + Path rootDir = TEST_UTIL.getDataTestDirOnTestFS(name.getMethodName()); FileSystem fs = TEST_UTIL.getTestFileSystem(); Configuration conf = TEST_UTIL.getConfiguration(); @@ -235,19 +235,15 @@ public class TestHRegionFileSystem { HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(conf, fs, rootDir, hri); assertTrue(fs.exists(regionFs.getRegionDir())); - regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), - null, null); - // HRegionFileSystem.createRegionOnFileSystem(conf, new MockFileSystemForCreate(), rootDir, - // hri); + regionFs = new HRegionFileSystem(conf, new MockFileSystemForCreate(), rootDir, hri); boolean result = regionFs.createDir(new Path("/foo/bar")); assertTrue("Couldn't create the directory", result); - - regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); + regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri); result = regionFs.rename(new Path("/foo/bar"), new Path("/foo/bar2")); assertTrue("Couldn't rename the directory", result); - regionFs = new HRegionFileSystem(conf, new MockFileSystem(), null, null); + regionFs = new HRegionFileSystem(conf, new MockFileSystem(), rootDir, hri); result = regionFs.deleteDir(new Path("/foo/bar")); assertTrue("Couldn't delete the directory", result); fs.delete(rootDir, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 50dffd50bfc..fab6512602d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -343,12 +343,12 @@ public class TestRegionServerMetrics { @Test public void testStoreCount() throws Exception { - //Force a hfile. + // Force a hfile. doNPuts(1, false); TEST_UTIL.getAdmin().flush(tableName); metricsRegionServer.getRegionServerWrapper().forceRecompute(); - assertGauge("storeCount", TABLES_ON_MASTER? 1: 4); + assertGauge("storeCount", TABLES_ON_MASTER ? 1 : 5); assertGauge("storeFileCount", 1); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java index ffa03a2d506..e9e92b8748b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.replication; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; -import java.util.ArrayList; -import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -38,6 +36,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.Before; @@ -47,7 +46,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Category(LargeTests.class) +@Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationDroppedTables extends TestReplicationBase { @ClassRule @@ -56,9 +55,6 @@ public class TestReplicationDroppedTables extends TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationDroppedTables.class); - /** - * @throws java.lang.Exception - */ @Before public void setUp() throws Exception { // Starting and stopping replication can make us miss new logs, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java new file mode 100644 index 00000000000..1408cf06f76 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.UUID; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestSerialReplication { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialReplication.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID = "1"; + + private static byte[] CF = Bytes.toBytes("CF"); + + private static byte[] CQ = Bytes.toBytes("CQ"); + + private static FileSystem FS; + + private static Path LOG_DIR; + + private static WALProvider.Writer WRITER; + + public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint { + + private static final UUID PEER_UUID = UUID.randomUUID(); + + @Override + public UUID getPeerUUID() { + return PEER_UUID; + } + + @Override + public boolean replicate(ReplicateContext replicateContext) { + synchronized (WRITER) { + try { + for (Entry entry : replicateContext.getEntries()) { + WRITER.append(entry); + } + WRITER.sync(false); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + return true; + } + + @Override + public void start() { + startAsync(); + } + + @Override + public void stop() { + stopAsync(); + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.getConfiguration().setInt("replication.source.nb.capacity", 10); + UTIL.startMiniCluster(3); + LOG_DIR = UTIL.getDataTestDirOnTestFS("replicated"); + FS = UTIL.getTestFileSystem(); + FS.mkdirs(LOG_DIR); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Rule + public final TestName name = new TestName(); + + private Path logPath; + + @Before + public void setUp() throws IOException, StreamLacksCapabilityException { + UTIL.ensureSomeRegionServersAvailable(3); + logPath = new Path(LOG_DIR, name.getMethodName()); + WRITER = WALFactory.createWALWriter(FS, logPath, UTIL.getConfiguration()); + // add in disable state, so later when enabling it all sources will start push together. + UTIL.getAdmin().addReplicationPeer(PEER_ID, + ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase") + .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(), + false); + } + + @After + public void tearDown() throws IOException { + UTIL.getAdmin().removeReplicationPeer(PEER_ID); + if (WRITER != null) { + WRITER.close(); + WRITER = null; + } + } + + @Test + public void testRegionMove() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + UTIL.getAdmin().createTable( + TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder + .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build()); + UTIL.waitTableAvailable(tableName); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0); + HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName)); + UTIL.getAdmin().move(region.getEncodedNameAsBytes(), + Bytes.toBytes(rs.getServerName().getServerName())); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return !rs.getRegions(tableName).isEmpty(); + } + + @Override + public String explainFailure() throws Exception { + return region + " is still not on " + rs; + } + }); + try (Table table = UTIL.getConnection().getTable(tableName)) { + for (int i = 100; i < 200; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + UTIL.getAdmin().enableReplicationPeer(PEER_ID); + UTIL.waitFor(30000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + try (WAL.Reader reader = WALFactory.createReader(FS, logPath, UTIL.getConfiguration())) { + int count = 0; + while (reader.next() != null) { + count++; + } + return count >= 200; + } catch (IOException e) { + return false; + } + } + + @Override + public String explainFailure() throws Exception { + return "Not enough entries replicated"; + } + }); + try (WAL.Reader reader = + WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) { + long seqId = -1L; + int count = 0; + for (Entry entry;;) { + entry = reader.next(); + if (entry == null) { + break; + } + assertTrue( + "Sequence id go backwards from " + seqId + " to " + entry.getKey().getSequenceId(), + entry.getKey().getSequenceId() >= seqId); + count++; + } + assertEquals(200, count); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index a53cba37080..6d75fec9fdd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -321,7 +321,7 @@ public abstract class TestReplicationSourceManager { wal.rollWriter(); manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), - "1", 0, false); + "1", 0, null, false); wal.append(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java new file mode 100644 index 00000000000..c8387c5c41b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestSerialReplicationChecker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSerialReplicationChecker.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID = "1"; + + private static ReplicationQueueStorage QUEUE_STORAGE; + + private static String WAL_FILE_NAME = "test.wal"; + + private SerialReplicationChecker checker; + + @Rule + public final TestName name = new TestName(); + + private TableName tableName; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), + UTIL.getConfiguration()); + QUEUE_STORAGE.addWAL(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), PEER_ID, + WAL_FILE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + ReplicationSource source = mock(ReplicationSource.class); + when(source.getPeerId()).thenReturn(PEER_ID); + when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE); + Server server = mock(Server.class); + when(server.getConnection()).thenReturn(UTIL.getConnection()); + when(source.getServer()).thenReturn(server); + checker = new SerialReplicationChecker(UTIL.getConfiguration(), source); + tableName = TableName.valueOf(name.getMethodName()); + } + + private Entry createEntry(RegionInfo region, long seqId) { + WALKeyImpl key = mock(WALKeyImpl.class); + when(key.getTableName()).thenReturn(tableName); + when(key.getEncodedRegionName()).thenReturn(region.getEncodedNameAsBytes()); + when(key.getSequenceId()).thenReturn(seqId); + Entry entry = mock(Entry.class); + when(entry.getKey()).thenReturn(key); + return entry; + } + + private Cell createCell(RegionInfo region) { + return CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(region.getStartKey()) + .setType(Type.Put).build(); + } + + @Test + public void testNoBarrierCanPush() throws IOException { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + assertTrue(checker.canPush(createEntry(region, 100), createCell(region))); + } + + private void addStateAndBarrier(RegionInfo region, RegionState.State state, long... barriers) + throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + for (int i = 0; i < barriers.length; i++) { + put.addColumn(HConstants.REPLICATION_BARRIER_FAMILY, HConstants.SEQNUM_QUALIFIER, + put.getTimeStamp() - i, Bytes.toBytes(barriers[i])); + } + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void setState(RegionInfo region, RegionState.State state) throws IOException { + Put put = new Put(region.getRegionName(), EnvironmentEdgeManager.currentTime()); + put.addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER, + Bytes.toBytes(state.name())); + try (Table table = UTIL.getConnection().getTable(TableName.META_TABLE_NAME)) { + table.put(put); + } + } + + private void updatePushedSeqId(RegionInfo region, long seqId) throws ReplicationException { + QUEUE_STORAGE.setWALPosition(UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName(), + PEER_ID, WAL_FILE_NAME, 10, ImmutableMap.of(region.getEncodedName(), seqId)); + } + + @Test + public void testLastRegionAndOpeningCanNotPush() throws IOException, ReplicationException { + RegionInfo region = RegionInfoBuilder.newBuilder(tableName).build(); + addStateAndBarrier(region, RegionState.State.OPEN, 10); + Cell cell = createCell(region); + // can push since we are in the first range + assertTrue(checker.canPush(createEntry(region, 100), cell)); + setState(region, RegionState.State.OPENING); + // can not push since we are in the last range and the state is OPENING + assertFalse(checker.canPush(createEntry(region, 102), cell)); + addStateAndBarrier(region, RegionState.State.OPEN, 50); + // can not push since the previous range has not been finished yet + assertFalse(checker.canPush(createEntry(region, 102), cell)); + updatePushedSeqId(region, 49); + // can push since the previous range has been finished + assertTrue(checker.canPush(createEntry(region, 102), cell)); + setState(region, RegionState.State.OPENING); + assertFalse(checker.canPush(createEntry(region, 104), cell)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index 2146e474a21..eb7d5a07537 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -21,13 +21,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Mockito.when; import java.io.IOException; import java.util.NavigableMap; -import java.util.NoSuchElementException; import java.util.OptionalLong; import java.util.TreeMap; import java.util.concurrent.PriorityBlockingQueue; @@ -40,13 +40,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; @@ -180,15 +180,12 @@ public class TestWALEntryStream { new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); - WAL.Entry entry = entryStream.next(); + WAL.Entry entry = entryStream.peek(); + assertSame(entry, entryStream.next()); assertNotNull(entry); assertFalse(entryStream.hasNext()); - try { - entry = entryStream.next(); - fail(); - } catch (NoSuchElementException e) { - // expected - } + assertNull(entryStream.peek()); + assertNull(entryStream.next()); oldPos = entryStream.getPosition(); } @@ -346,10 +343,12 @@ public class TestWALEntryStream { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + Server mockServer= Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); when(source.getWALFileLengthProvider()).thenReturn(log); + when(source.getServer()).thenReturn(mockServer); ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); Path walPath = walQueue.peek();