From 000e64abda83264a84a2eea8f3e0b96a04e54bf9 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 18 Mar 2023 21:38:53 +0800 Subject: [PATCH] HBASE-27216 Revisit the ReplicationSyncUp tool (#4966) Signed-off-by: Liangjun He --- .../apache/hadoop/hbase/util/JsonMapper.java | 4 + .../server/master/MasterProcedure.proto | 1 + .../replication/ReplicationQueueStorage.java | 21 + .../ReplicationStorageFactory.java | 25 +- .../TableReplicationQueueStorage.java | 20 + .../apache/hadoop/hbase/master/HMaster.java | 41 ++ .../AssignReplicationQueuesProcedure.java | 48 ++- .../ClaimReplicationQueueRemoteProcedure.java | 32 ++ .../OfflineTableReplicationQueueStorage.java | 382 ++++++++++++++++++ .../replication/ReplicationPeerManager.java | 2 +- .../ReplicationSourceManager.java | 188 +++++---- .../regionserver/ReplicationSyncUp.java | 195 +++++++-- .../hbase/wal/AbstractFSWALProvider.java | 4 + .../hbase/master/cleaner/TestLogsCleaner.java | 4 +- .../TestReplicationSyncUpTool.java | 185 +++++---- .../TestReplicationSyncUpToolBase.java | 3 +- ...plicationSyncUpToolWithBulkLoadedData.java | 92 ++--- .../TestTableReplicationQueueStorage.java | 51 +++ ...icationSyncUpToolWithMultipleAsyncWAL.java | 3 - ...tReplicationSyncUpToolWithMultipleWAL.java | 3 - .../TestReplicationSourceManager.java | 14 +- .../TestSerialReplicationChecker.java | 4 +- 22 files changed, 1029 insertions(+), 293 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java index 0ff131f23bf..f2c4585a6a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/util/JsonMapper.java @@ -40,4 +40,8 @@ public final class JsonMapper { public static String writeObjectAsString(Object object) throws IOException { return GSON.toJson(object); } + + public static T fromJson(String json, Class clazz) { + return GSON.fromJson(json, clazz); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 14d07c17c88..901abf6bd0c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData { enum AssignReplicationQueuesState { ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1; ASSIGN_REPLICATION_QUEUES_CLAIM = 2; + ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3; } message AssignReplicationQueuesStateData { diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java index 1e36bbeb78f..b5bc64eb55a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueStorage.java @@ -203,4 +203,25 @@ public interface ReplicationQueueStorage { * Add the given hfile refs to the given peer. */ void batchUpdateHFileRefs(String peerId, List hfileRefs) throws ReplicationException; + + // the below method is for clean up stale data after running ReplicatoinSyncUp + /** + * Remove all the last sequence ids and hfile references data which are written before the given + * timestamp. + *

+ * The data of these two types are not used by replication directly. + *

+ * For last sequence ids, we will check it in serial replication, to make sure that we will + * replicate all edits in order, so if there are stale data, the worst case is that we will stop + * replicating as we think we still need to finish previous ranges first, although actually we + * have already replicated them out. + *

+ * For hfile references, it is just used by hfile cleaner to not remove these hfiles before we + * replicate them out, so if there are stale data, the worst case is that we can not remove these + * hfiles, although actually they have already been replicated out. + *

+ * So it is OK for us to just bring up the cluster first, and then use this method to delete the + * stale data, i.e, the data which are written before a specific timestamp. + */ + void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException; } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java index d0c204f9934..0b0eb0fc43f 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Used to create replication storage(peer, queue) classes. @@ -39,6 +41,8 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationStorageFactory { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class); + public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl"; // must use zookeeper here, otherwise when user upgrading from an old version without changing the @@ -51,6 +55,8 @@ public final class ReplicationStorageFactory { public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT = TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.storage.impl"; + public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName) throws IOException { return TableDescriptorBuilder.newBuilder(tableName) @@ -108,15 +114,26 @@ public final class ReplicationStorageFactory { */ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, Configuration conf) { - return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME, - REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()))); + return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf + .get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()))); } /** * Create a new {@link ReplicationQueueStorage}. */ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, - TableName tableName) { - return new TableReplicationQueueStorage(conn, tableName); + Configuration conf, TableName tableName) { + Class clazz = conf.getClass(REPLICATION_QUEUE_IMPL, + TableReplicationQueueStorage.class, ReplicationQueueStorage.class); + try { + Constructor c = + clazz.getConstructor(Connection.class, TableName.class); + return c.newInstance(conn, tableName); + } catch (Exception e) { + LOG.debug( + "failed to create ReplicationQueueStorage with Connection, try creating with Configuration", + e); + return ReflectionUtils.newInstance(clazz, conf, tableName); + } } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java index f3870f4d09d..e59edd52f79 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java @@ -594,4 +594,24 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage { throw new ReplicationException("failed to batch update hfile references", e); } } + + @Override + public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException { + try (Table table = conn.getTable(tableName); + ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY) + .addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) { + for (;;) { + Result r = scanner.next(); + if (r == null) { + break; + } + Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts) + .addFamily(HFILE_REF_FAMILY, ts); + table.delete(delete); + } + } catch (IOException e) { + throw new ReplicationException( + "failed to remove last sequence ids and hfile references before timestamp " + ts, e); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index adb53468ce7..3d59db24501 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -34,6 +34,9 @@ import java.lang.reflect.InvocationTargetException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -59,6 +62,7 @@ import java.util.stream.Collectors; import javax.servlet.http.HttpServlet; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CatalogFamilyFormat; @@ -226,6 +230,8 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo; import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; @@ -246,6 +252,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.JsonMapper; import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.RetryCounter; @@ -267,7 +274,9 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; +import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +import org.apache.hbase.thirdparty.com.google.gson.JsonParseException; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; @@ -1286,6 +1295,38 @@ public class HMaster extends HBaseServerBase implements Maste status.setStatus("Initializing MOB Cleaner"); initMobCleaner(); + // delete the stale data for replication sync up tool if necessary + status.setStatus("Cleanup ReplicationSyncUp status if necessary"); + Path replicationSyncUpInfoFile = + new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE); + if (dataFs.exists(replicationSyncUpInfoFile)) { + // info file is available, load the timestamp and use it to clean up stale data in replication + // queue storage. + byte[] data; + try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) { + data = ByteStreams.toByteArray(in); + } + ReplicationSyncUpToolInfo info = null; + try { + info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class); + } catch (JsonParseException e) { + // usually this should be a partial file, which means the ReplicationSyncUp tool did not + // finish properly, so not a problem. Here we do not clean up the status as we do not know + // the reason why the tool did not finish properly, so let users clean the status up + // manually + LOG.warn("failed to parse replication sync up info file, ignore and continue...", e); + } + if (info != null) { + LOG.info("Remove last sequence ids and hfile references which are written before {}({})", + info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault()) + .format(Instant.ofEpochMilli(info.getStartTimeMs()))); + replicationPeerManager.getQueueStorage() + .removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs()); + // delete the file after removing the stale data, so next time we do not need to do this + // again. + dataFs.delete(replicationSyncUpInfoFile, false); + } + } status.setStatus("Calling postStartMaster coprocessors"); if (this.cpHost != null) { // don't let cp initialization errors kill the master diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java index d33259dd436..b547c87009d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java @@ -24,7 +24,9 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -102,7 +105,7 @@ public class AssignReplicationQueuesProcedure } } - private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException { + private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException { Set existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream() .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet()); ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); @@ -130,18 +133,51 @@ public class AssignReplicationQueuesProcedure return Flow.HAS_MORE_STATE; } + // check whether ReplicationSyncUp has already done the work for us, if so, we should skip + // claiming the replication queues and deleting them instead. + private boolean shouldSkip(MasterProcedureEnv env) throws IOException { + MasterFileSystem mfs = env.getMasterFileSystem(); + Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); + return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName())); + } + + private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException { + ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); + for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) { + storage.removeQueue(queueId); + } + MasterFileSystem mfs = env.getMasterFileSystem(); + Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); + // remove the region server record file + mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false); + } + @Override protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { try { switch (state) { case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES: - addMissingQueues(env); - retryCounter = null; - setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM); - return Flow.HAS_MORE_STATE; + if (shouldSkip(env)) { + setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES); + return Flow.HAS_MORE_STATE; + } else { + addMissingQueues(env); + retryCounter = null; + setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM); + return Flow.HAS_MORE_STATE; + } case ASSIGN_REPLICATION_QUEUES_CLAIM: - return claimQueues(env); + if (shouldSkip(env)) { + retryCounter = null; + setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES); + return Flow.HAS_MORE_STATE; + } else { + return claimQueues(env); + } + case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES: + removeQueues(env); + return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java index 7b637384398..d3aeeba541a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java @@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; import java.util.Optional; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +60,32 @@ public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure this.targetServer = targetServer; } + // check whether ReplicationSyncUp has already done the work for us, if so, we should skip + // claiming the replication queues and deleting them instead. + private boolean shouldSkip(MasterProcedureEnv env) throws IOException { + MasterFileSystem mfs = env.getMasterFileSystem(); + Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); + return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName())); + } + + @Override + protected synchronized Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + try { + if (shouldSkip(env)) { + LOG.info("Skip claiming {} because replication sync up has already done it for us", + getServerName()); + return null; + } + } catch (IOException e) { + LOG.warn("failed to check whether we should skip claiming {} due to replication sync up", + getServerName(), e); + // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule + return null; + } + return super.execute(env); + } + @Override public Optional remoteCallBuild(MasterProcedureEnv env, ServerName remote) { assert targetServer.equals(remote); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java new file mode 100644 index 00000000000..9faca74f710 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/OfflineTableReplicationQueueStorage.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClientSideRegionScanner; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@InterfaceAudience.Private +public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage { + + private final Map> offsets = + new HashMap<>(); + + private final Map> lastSequenceIds = new HashMap<>(); + + private final Map> hfileRefs = new HashMap<>(); + + private void loadRegionInfo(FileSystem fs, Path regionDir, + NavigableMap startKey2RegionInfo) throws IOException { + RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + // TODO: we consider that the there will not be too many regions for hbase:replication table, so + // here we just iterate over all the regions to find out the overlapped ones. Can be optimized + // later. + Iterator> iter = startKey2RegionInfo.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (hri.isOverlap(entry.getValue())) { + if (hri.getRegionId() > entry.getValue().getRegionId()) { + // we are newer, remove the old hri, we can not break here as if hri is a merged region, + // we need to remove all its parent regions. + iter.remove(); + } else { + // we are older, just return, skip the below add + return; + } + } + + } + startKey2RegionInfo.put(hri.getStartKey(), hri); + } + + private void loadOffsets(Result result) { + NavigableMap map = + result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY); + if (map == null || map.isEmpty()) { + return; + } + Map offsetMap = new HashMap<>(); + map.forEach((k, v) -> { + String walGroup = Bytes.toString(k); + ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v)); + offsetMap.put(walGroup, offset); + }); + ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow())); + offsets.put(queueId, offsetMap); + } + + private void loadLastSequenceIds(Result result) { + NavigableMap map = + result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY); + if (map == null || map.isEmpty()) { + return; + } + Map lastSeqIdMap = new HashMap<>(); + map.forEach((k, v) -> { + String encodedRegionName = Bytes.toString(k); + long lastSeqId = Bytes.toLong(v); + lastSeqIdMap.put(encodedRegionName, lastSeqId); + }); + String peerId = Bytes.toString(result.getRow()); + lastSequenceIds.put(peerId, lastSeqIdMap); + } + + private void loadHFileRefs(Result result) { + NavigableMap map = + result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY); + if (map == null || map.isEmpty()) { + return; + } + Set refs = new HashSet<>(); + map.keySet().forEach(ref -> refs.add(Bytes.toString(ref))); + String peerId = Bytes.toString(result.getRow()); + hfileRefs.put(peerId, refs); + } + + private void loadReplicationQueueData(Configuration conf, TableName tableName) + throws IOException { + Path rootDir = CommonFSUtils.getRootDir(conf); + Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName); + FileSystem fs = tableDir.getFileSystem(conf); + FileStatus[] regionDirs = + CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs)); + if (regionDirs == null) { + return; + } + NavigableMap startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (FileStatus regionDir : regionDirs) { + loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo); + } + TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); + for (RegionInfo hri : startKey2RegionInfo.values()) { + try (ClientSideRegionScanner scanner = + new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) { + for (;;) { + Result result = scanner.next(); + if (result == null) { + break; + } + loadOffsets(result); + loadLastSequenceIds(result); + loadHFileRefs(result); + } + } + } + } + + public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName) + throws IOException { + loadReplicationQueueData(conf, tableName); + } + + @Override + public synchronized void setOffset(ReplicationQueueId queueId, String walGroup, + ReplicationGroupOffset offset, Map lastSeqIds) throws ReplicationException { + Map offsetMap = offsets.get(queueId); + if (offsetMap == null) { + offsetMap = new HashMap<>(); + offsets.put(queueId, offsetMap); + } + offsetMap.put(walGroup, offset); + Map lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId()); + if (lastSeqIdsMap == null) { + lastSeqIdsMap = new HashMap<>(); + lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap); + } + for (Map.Entry entry : lastSeqIds.entrySet()) { + Long oldSeqId = lastSeqIdsMap.get(entry.getKey()); + if (oldSeqId == null || oldSeqId < entry.getValue()) { + lastSeqIdsMap.put(entry.getKey(), entry.getValue()); + } + } + } + + @Override + public synchronized Map getOffsets(ReplicationQueueId queueId) + throws ReplicationException { + Map offsetMap = offsets.get(queueId); + if (offsetMap == null) { + return Collections.emptyMap(); + } + return ImmutableMap.copyOf(offsetMap); + } + + @Override + public synchronized List listAllQueueIds(String peerId) + throws ReplicationException { + return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId)) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllQueueIds(ServerName serverName) + throws ReplicationException { + return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName)) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllQueueIds(String peerId, ServerName serverName) + throws ReplicationException { + return offsets.keySet().stream() + .filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName)) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllQueues() throws ReplicationException { + return offsets.entrySet().stream() + .map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue()))) + .collect(Collectors.toList()); + } + + @Override + public synchronized List listAllReplicators() throws ReplicationException { + return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct() + .collect(Collectors.toList()); + } + + @Override + public synchronized Map claimQueue(ReplicationQueueId queueId, + ServerName targetServerName) throws ReplicationException { + Map offsetMap = offsets.remove(queueId); + if (offsetMap == null) { + return Collections.emptyMap(); + } + offsets.put(queueId.claim(targetServerName), offsetMap); + return ImmutableMap.copyOf(offsetMap); + } + + @Override + public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException { + offsets.remove(queueId); + } + + @Override + public synchronized void removeAllQueues(String peerId) throws ReplicationException { + Iterator iter = offsets.keySet().iterator(); + while (iter.hasNext()) { + if (iter.next().getPeerId().equals(peerId)) { + iter.remove(); + } + } + } + + @Override + public synchronized long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + Map lastSeqIdMap = lastSequenceIds.get(peerId); + if (lastSeqIdMap == null) { + return HConstants.NO_SEQNUM; + } + Long lastSeqId = lastSeqIdMap.get(encodedRegionName); + return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM; + } + + @Override + public synchronized void setLastSequenceIds(String peerId, Map lastSeqIds) + throws ReplicationException { + Map lastSeqIdMap = lastSequenceIds.get(peerId); + if (lastSeqIdMap == null) { + lastSeqIdMap = new HashMap<>(); + lastSequenceIds.put(peerId, lastSeqIdMap); + } + lastSeqIdMap.putAll(lastSeqIds); + } + + @Override + public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException { + lastSequenceIds.remove(peerId); + } + + @Override + public synchronized void removeLastSequenceIds(String peerId, List encodedRegionNames) + throws ReplicationException { + Map lastSeqIdMap = lastSequenceIds.get(peerId); + if (lastSeqIdMap == null) { + return; + } + for (String encodedRegionName : encodedRegionNames) { + lastSeqIdMap.remove(encodedRegionName); + } + } + + @Override + public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException { + hfileRefs.remove(peerId); + } + + @Override + public synchronized void addHFileRefs(String peerId, List> pairs) + throws ReplicationException { + Set refs = hfileRefs.get(peerId); + if (refs == null) { + refs = new HashSet<>(); + hfileRefs.put(peerId, refs); + } + for (Pair pair : pairs) { + refs.add(pair.getSecond().getName()); + } + } + + @Override + public synchronized void removeHFileRefs(String peerId, List files) + throws ReplicationException { + Set refs = hfileRefs.get(peerId); + if (refs == null) { + return; + } + refs.removeAll(files); + } + + @Override + public synchronized List getAllPeersFromHFileRefsQueue() throws ReplicationException { + return ImmutableList.copyOf(hfileRefs.keySet()); + } + + @Override + public synchronized List getReplicableHFiles(String peerId) throws ReplicationException { + Set refs = hfileRefs.get(peerId); + if (refs == null) { + return Collections.emptyList(); + } + return ImmutableList.copyOf(refs); + } + + @Override + public synchronized Set getAllHFileRefs() throws ReplicationException { + return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet()); + } + + @Override + public boolean hasData() throws ReplicationException { + return true; + } + + @Override + public void batchUpdateQueues(ServerName serverName, List datas) + throws ReplicationException { + throw new UnsupportedOperationException(); + } + + @Override + public void batchUpdateLastSequenceIds( + List lastPushedSeqIds) + throws ReplicationException { + throw new UnsupportedOperationException(); + } + + @Override + public void batchUpdateHFileRefs(String peerId, List hfileRefs) + throws ReplicationException { + throw new UnsupportedOperationException(); + } + + @Override + public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException { + throw new UnsupportedOperationException(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 8cfb36a1bc1..8b01225e553 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -654,7 +654,7 @@ public class ReplicationPeerManager implements ConfigurationObserver { }; } return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage( - services.getConnection(), replicationQueueTableName), initializer); + services.getConnection(), conf, replicationQueueTableName), initializer); } public static ReplicationPeerManager create(MasterServices services, String clusterId) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 2fb996c6e4d..d54cda92d90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -25,7 +25,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -126,6 +125,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto */ @InterfaceAudience.Private public class ReplicationSourceManager { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); // all the sources that read this RS's logs and every peer only has one replication source private final ConcurrentMap sources; @@ -147,13 +147,15 @@ public class ReplicationSourceManager { // All logs we are currently tracking // Index structure of the map is: queue_id->logPrefix/logGroup->logs - // For normal replication source, the peer id is same with the queue id private final ConcurrentMap>> walsById; // Logs for recovered sources we are currently tracking // the map is: queue_id->logPrefix/logGroup->logs - // For recovered source, the queue id's format is peer_id-servername-* + // for recovered source, the WAL files should already been moved to oldLogDir, and we have + // different layout of old WAL files, for example, with server name sub directories or not, so + // here we record the full path instead of just the name, so when refreshing we can enqueue the + // WAL file again, without trying to guess the real path of the WAL files. private final ConcurrentMap>> walsByIdRecoveredQueues; + Map>> walsByIdRecoveredQueues; private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; @@ -515,9 +517,9 @@ public class ReplicationSourceManager { ReplicationSourceInterface recoveredReplicationSource = createRefreshedSource(oldSourceQueueId, peer); this.oldsources.add(recoveredReplicationSource); - for (SortedSet walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId) + for (NavigableSet walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId) .values()) { - walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); + walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal)); } toStartup.add(recoveredReplicationSource); } @@ -657,9 +659,11 @@ public class ReplicationSourceManager { void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) { String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); if (source.isRecovered()) { - NavigableSet wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); + NavigableSet wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); if (wals != null) { - NavigableSet walsToRemove = wals.headSet(log, inclusive); + // here we just want to compare the timestamp, so it is OK to just create a fake WAL path + NavigableSet walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive) + .stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new)); if (walsToRemove.isEmpty()) { return; } @@ -815,6 +819,93 @@ public class ReplicationSourceManager { } void claimQueue(ReplicationQueueId queueId) { + claimQueue(queueId, false); + } + + // sorted from oldest to newest + private PriorityQueue getWALFilesToReplicate(ServerName sourceRS, boolean syncUp, + Map offsets) throws IOException { + List walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS, + URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name())); + if (syncUp) { + // we also need to list WALs directory for ReplicationSyncUp + walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS)); + } + PriorityQueue walFilesPQ = + new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR); + // sort the wal files and also filter out replicated files + for (Path file : walFiles) { + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName()); + ReplicationGroupOffset groupOffset = offsets.get(walGroupId); + if (shouldReplicate(groupOffset, file.getName())) { + walFilesPQ.add(file); + } else { + LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(), + groupOffset); + } + } + return walFilesPQ; + } + + private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer, + ReplicationQueueId claimedQueueId, PriorityQueue walFiles) { + ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId()); + if (peer == null || peer != oldPeer) { + src.terminate("Recovered queue doesn't belong to any current peer"); + deleteQueue(claimedQueueId); + return; + } + // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is + // transiting to STANDBY state. The only exception is we are in STANDBY state and + // transiting to DA, under this state we will replay the remote WAL and they need to be + // replicated back. + if (peer.getPeerConfig().isSyncReplication()) { + Pair stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if ( + (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) + && stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) + || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY) + ) { + src.terminate("Sync replication peer is in STANDBY state"); + deleteQueue(claimedQueueId); + return; + } + } + // track sources in walsByIdRecoveredQueues + Map> walsByGroup = new HashMap<>(); + walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup); + for (Path wal : walFiles) { + String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); + NavigableSet wals = walsByGroup.get(walPrefix); + if (wals == null) { + wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR); + walsByGroup.put(walPrefix, wals); + } + wals.add(wal); + } + oldsources.add(src); + LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId, + walFiles.size()); + for (Path wal : walFiles) { + LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId); + src.enqueueLog(wal); + } + src.startup(); + } + + /** + * Claim a replication queue. + *

+ * We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue + * operation, we are the last step of a SCP, so we can assume that all the WAL files are under + * oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a + * region server which has not been processed by SCP yet, so we still need to look at its WALs + * directory. + * @param queueId the replication queue id we want to claim + * @param syncUp whether we are called by ReplicationSyncUp + */ + void claimQueue(ReplicationQueueId queueId, boolean syncUp) { // Wait a bit before transferring the queues, we may be shutting down. // This sleep may not be enough in some cases. try { @@ -873,76 +964,17 @@ public class ReplicationSourceManager { server.abort("Failed to create replication source after claiming queue.", e); return; } - List walFiles; + PriorityQueue walFiles; try { - walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS, - URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name())); + walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets); } catch (IOException e) { - LOG.error("Can not list all wal files for peer {} and queue {}", peerId, queueId, e); - server.abort("Can not list all wal files after claiming queue.", e); + LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e); + server.abort("Can not list wal files after claiming queue.", e); return; } - PriorityQueue walFilesPQ = new PriorityQueue<>( - Comparator. comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName())) - .thenComparing(Path::getName)); - // sort the wal files and also filter out replicated files - for (Path file : walFiles) { - String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName()); - ReplicationGroupOffset groupOffset = offsets.get(walGroupId); - if (shouldReplicate(groupOffset, file.getName())) { - walFilesPQ.add(file); - } else { - LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(), - groupOffset); - } - } - // the method is a bit long, so assign it to null here to avoid later we reuse it again by - // mistake, we should use the sorted walFilesPQ instead - walFiles = null; // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer synchronized (oldsources) { - peer = replicationPeers.getPeer(src.getPeerId()); - if (peer == null || peer != oldPeer) { - src.terminate("Recovered queue doesn't belong to any current peer"); - deleteQueue(claimedQueueId); - return; - } - // Do not setup recovered queue if a sync replication peer is in STANDBY state, or is - // transiting to STANDBY state. The only exception is we are in STANDBY state and - // transiting to DA, under this state we will replay the remote WAL and they need to be - // replicated back. - if (peer.getPeerConfig().isSyncReplication()) { - Pair stateAndNewState = - peer.getSyncReplicationStateAndNewState(); - if ( - (stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) - && stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) - || stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY) - ) { - src.terminate("Sync replication peer is in STANDBY state"); - deleteQueue(claimedQueueId); - return; - } - } - // track sources in walsByIdRecoveredQueues - Map> walsByGroup = new HashMap<>(); - walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup); - for (Path wal : walFilesPQ) { - String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName()); - NavigableSet wals = walsByGroup.get(walPrefix); - if (wals == null) { - wals = new TreeSet<>(); - walsByGroup.put(walPrefix, wals); - } - wals.add(wal.getName()); - } - oldsources.add(src); - LOG.info("Added source for recovered queue {}", claimedQueueId); - for (Path wal : walFilesPQ) { - LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId); - src.enqueueLog(new Path(oldLogDir, wal)); - } - src.startup(); + addRecoveredSource(src, oldPeer, claimedQueueId, walFiles); } } @@ -971,16 +1003,6 @@ public class ReplicationSourceManager { return Collections.unmodifiableMap(walsById); } - /** - * Get a copy of the wals of the recovered sources on this rs - * @return a sorted set of wal names - */ - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") - Map>> getWalsByIdRecoveredQueues() { - return Collections.unmodifiableMap(walsByIdRecoveredQueues); - } - /** * Get a list of all the normal sources of this rs * @return list of all normal sources @@ -1100,8 +1122,6 @@ public class ReplicationSourceManager { return this.globalMetrics; } - @RestrictedApi(explanation = "Should only be called in tests", link = "", - allowedOnPath = ".*/src/test/.*") ReplicationQueueStorage getQueueStorage() { return queueStorage; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index b63ad473719..f071cf6f1f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; @@ -35,11 +39,18 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.JsonMapper; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -59,6 +70,31 @@ import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private public class ReplicationSyncUp extends Configured implements Tool { + public static class ReplicationSyncUpToolInfo { + + private long startTimeMs; + + public ReplicationSyncUpToolInfo() { + } + + public ReplicationSyncUpToolInfo(long startTimeMs) { + this.startTimeMs = startTimeMs; + } + + public long getStartTimeMs() { + return startTimeMs; + } + + public void setStartTimeMs(long startTimeMs) { + this.startTimeMs = startTimeMs; + } + } + + // For storing the information used to skip replicating some wals after the cluster is back online + public static final String INFO_DIR = "ReplicationSyncUp"; + + public static final String INFO_FILE = "info"; + private static final long SLEEP_TIME = 10000; /** @@ -69,41 +105,116 @@ public class ReplicationSyncUp extends Configured implements Tool { System.exit(ret); } - private Set getLiveRegionServers(ZKWatcher zkw) throws KeeperException { - List rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); - return rsZNodes == null - ? Collections.emptySet() - : rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet()); + // Find region servers under wal directory + // Here we only care about the region servers which may still be alive, as we need to add + // replications for them if missing. The dead region servers which have already been processed + // fully do not need to add their replication queues again, as the operation has already been done + // in SCP. + private Set listRegionServers(FileSystem walFs, Path walDir) throws IOException { + FileStatus[] statuses; + try { + statuses = walFs.listStatus(walDir); + } catch (FileNotFoundException e) { + System.out.println("WAL directory " + walDir + " does not exists, ignore"); + return Collections.emptySet(); + } + Set regionServers = new HashSet<>(); + for (FileStatus status : statuses) { + // All wal files under the walDir is within its region server's directory + if (!status.isDirectory()) { + continue; + } + ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath()); + if (sn != null) { + regionServers.add(sn); + } + } + return regionServers; + } + + private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer, + Set peerIds) throws ReplicationException { + Set existingQueuePeerIds = new HashSet<>(); + List queueIds = storage.listAllQueueIds(regionServer); + for (Iterator iter = queueIds.iterator(); iter.hasNext();) { + ReplicationQueueId queueId = iter.next(); + if (!queueId.isRecovered()) { + existingQueuePeerIds.add(queueId.getPeerId()); + } + } + + for (String peerId : peerIds) { + if (!existingQueuePeerIds.contains(peerId)) { + ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId); + System.out.println("Add replication queue " + queueId + " for claiming"); + storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN, + Collections.emptyMap()); + } + } + } + + private void addMissingReplicationQueues(ReplicationQueueStorage storage, + Set regionServers, Set peerIds) throws ReplicationException { + for (ServerName regionServer : regionServers) { + addMissingReplicationQueues(storage, regionServer, peerIds); + } } // When using this tool, usually the source cluster is unhealthy, so we should try to claim the // replication queues for the dead region servers first and then replicate the data out. - private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr) - throws ReplicationException, KeeperException { - // TODO: reimplement this tool - // List replicators = mgr.getQueueStorage().getListOfReplicators(); - // Set liveRegionServers = getLiveRegionServers(zkw); - // for (ServerName sn : replicators) { - // if (!liveRegionServers.contains(sn)) { - // List replicationQueues = mgr.getQueueStorage().getAllQueues(sn); - // System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); - // for (String queue : replicationQueues) { - // mgr.claimQueue(sn, queue); - // } - // } - // } + private void claimReplicationQueues(ReplicationSourceManager mgr, Set regionServers) + throws ReplicationException, KeeperException, IOException { + // union the region servers from both places, i.e, from the wal directory, and the records in + // replication queue storage. + Set replicators = new HashSet<>(regionServers); + ReplicationQueueStorage queueStorage = mgr.getQueueStorage(); + replicators.addAll(queueStorage.listAllReplicators()); + FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); + Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); + for (ServerName sn : replicators) { + List replicationQueues = queueStorage.listAllQueueIds(sn); + System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); + // record the rs name, so when master restarting, we will skip claiming its replication queue + fs.createNewFile(new Path(infoDir, sn.getServerName())); + for (ReplicationQueueId queueId : replicationQueues) { + mgr.claimQueue(queueId, true); + } + } + } + + private void writeInfoFile(FileSystem fs) throws IOException { + // Record the info of this run. Currently only record the time we run the job. We will use this + // timestamp to clean up the data for last sequence ids and hfile refs in replication queue + // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore. + ReplicationSyncUpToolInfo info = + new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime()); + String json = JsonMapper.writeObjectAsString(info); + Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); + try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) { + out.write(Bytes.toBytes(json)); + } } @Override public int run(String[] args) throws Exception { Abortable abortable = new Abortable() { + + private volatile boolean abort = false; + @Override public void abort(String why, Throwable e) { + if (isAborted()) { + return; + } + abort = true; + System.err.println("Aborting because of " + why); + e.printStackTrace(); + System.exit(1); } @Override public boolean isAborted() { - return false; + return abort; } }; Configuration conf = getConf(); @@ -114,16 +225,24 @@ public class ReplicationSyncUp extends Configured implements Tool { Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); - System.out.println("Start Replication Server start"); + System.out.println("Start Replication Server"); + writeInfoFile(fs); Replication replication = new Replication(); - replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, + // use offline table replication queue storage + getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, + OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class); + DummyServer server = new DummyServer(getConf(), zkw); + replication.initialize(server, fs, new Path(logDir, server.toString()), oldLogDir, new WALFactory(conf, ServerName .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()), null, false)); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init(); - claimReplicationQueues(zkw, manager); + Set regionServers = listRegionServers(fs, logDir); + addMissingReplicationQueues(manager.getQueueStorage(), regionServers, + manager.getReplicationPeers().getAllPeerIds()); + claimReplicationQueues(manager, regionServers); while (manager.activeFailoverTaskCount() > 0) { Thread.sleep(SLEEP_TIME); } @@ -138,23 +257,22 @@ public class ReplicationSyncUp extends Configured implements Tool { return 0; } - class DummyServer implements Server { - String hostname; - ZKWatcher zkw; + private static final class DummyServer implements Server { + private final Configuration conf; + private final String hostname; + private final ZKWatcher zkw; + private volatile boolean abort = false; - DummyServer(ZKWatcher zkw) { + DummyServer(Configuration conf, ZKWatcher zkw) { // a unique name in case the first run fails hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org"; + this.conf = conf; this.zkw = zkw; } - DummyServer(String hostname) { - this.hostname = hostname; - } - @Override public Configuration getConfiguration() { - return getConf(); + return conf; } @Override @@ -174,11 +292,18 @@ public class ReplicationSyncUp extends Configured implements Tool { @Override public void abort(String why, Throwable e) { + if (isAborted()) { + return; + } + abort = true; + System.err.println("Aborting because of " + why); + e.printStackTrace(); + System.exit(1); } @Override public boolean isAborted() { - return false; + return abort; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 5bbc6679196..5dc40dd6049 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -311,6 +311,10 @@ public abstract class AbstractFSWALProvider> implemen return matcher.matches() ? Long.parseLong(matcher.group(2)) : NO_TIMESTAMP; } + public static final Comparator TIMESTAMP_COMPARATOR = + Comparator. comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName())) + .thenComparing(Path::getName); + /** * Construct the directory name for all WALs on a given server. Dir names currently look like this * for WALs: hbase//WALs/kalashnikov.att.net,61634,1486865297088. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index d7ba6c227c6..5d474bc2164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -127,8 +127,8 @@ public class TestLogsCleaner { TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); TEST_UTIL.getAdmin().createTable(td); TEST_UTIL.waitTableAvailable(tableName); - queueStorage = - ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName); + queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), + conf, tableName); masterServices = mock(MasterServices.class); when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java index 7a89af15902..38225613b9d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java @@ -22,24 +22,28 @@ import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH; import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// revisit later when we implement the new ReplicationSyncUpTool -@Ignore @Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { @@ -55,39 +59,70 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { */ @Test public void testSyncUpTool() throws Exception { - - /** - * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: - * 'cf1' : replicated 'norep': not replicated - */ + // Set up Replication: on Master and one Slave + // Table: t1_syncup and t2_syncup + // columnfamily: + // 'cf1' : replicated + // 'norep': not replicated setupReplication(); - /** - * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows - * into cf1, and 1 rows into norep verify correctly replicated to slave - */ + // + // at Master: + // t1_syncup: put 100 rows into cf1, and 1 rows into norep + // t2_syncup: put 200 rows into cf1, and 1 rows into norep + // + // verify correctly replicated to slave putAndReplicateRows(); - /** - * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows - * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on - * master, restart hbase on Slave step 4: verify Slave still have the rows before delete - * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master - * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows - * from cf1 verify correctly replicated to Slave - */ + // Verify delete works + // + // step 1: stop hbase on Slave + // + // step 2: at Master: + // t1_syncup: delete 50 rows from cf1 + // t2_syncup: delete 100 rows from cf1 + // no change on 'norep' + // + // step 3: stop hbase on master, restart hbase on Slave + // + // step 4: verify Slave still have the rows before delete + // t1_syncup: 100 rows from cf1 + // t2_syncup: 200 rows from cf1 + // + // step 5: run syncup tool on Master + // + // step 6: verify that delete show up on Slave + // t1_syncup: 50 rows from cf1 + // t2_syncup: 100 rows from cf1 + // + // verify correctly replicated to Slave mimicSyncUpAfterDelete(); - /** - * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from - * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will - * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will - * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave - * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step - * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not - * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to - * Slave - */ + // Verify put works + // + // step 1: stop hbase on Slave + // + // step 2: at Master: + // t1_syncup: put 100 rows from cf1 + // t2_syncup: put 200 rows from cf1 + // and put another row on 'norep' + // ATTN: + // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively + // put to 'norep' will add a new row. + // + // step 3: stop hbase on master, restart hbase on Slave + // + // step 4: verify Slave still has the rows before put + // t1_syncup: 50 rows from cf1 + // t2_syncup: 100 rows from cf1 + // + // step 5: run syncup tool on Master + // + // step 6: verify that put show up on Slave and 'norep' does not + // t1_syncup: 100 rows from cf1 + // t2_syncup: 200 rows from cf1 + // + // verify correctly replicated to Slave mimicSyncUpAfterPut(); } @@ -172,7 +207,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { int rowCount_ht2Source = countRows(ht2Source); assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, rowCount_ht2Source); - + List sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream() + .map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList()); shutDownSourceHBaseCluster(); restartTargetHBaseCluster(1); @@ -184,40 +220,33 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); + syncUp(UTIL1); + // After sync up - for (int i = 0; i < NB_RETRIES; i++) { - syncUp(UTIL1); - rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); - rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); - if (i == NB_RETRIES - 1) { - if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) { - // syncUP still failed. Let's look at the source in case anything wrong there - restartSourceHBaseCluster(1); - rowCount_ht1Source = countRows(ht1Source); - LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = countRows(ht2Source); - LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); - } - assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, - rowCountHt1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, - rowCountHt2TargetAtPeer1); - } - if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) { - LOG.info("SyncUpAfterDelete succeeded at retry = " + i); - break; - } else { - LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCountHt2TargetAtPeer1); - } - Thread.sleep(SLEEP_TIME); + rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50, + rowCountHt1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100, + rowCountHt2TargetAtPeer1); + + // check we have recorded the dead region servers and also have an info file + Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration()); + Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR); + FileSystem fs = UTIL1.getTestFileSystem(); + for (ServerName sn : sourceRses) { + assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName()))); } + assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE))); + assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length); + + restartSourceHBaseCluster(1); + // should finally removed all the records after restart + UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0); } private void mimicSyncUpAfterPut() throws Exception { LOG.debug("mimicSyncUpAfterPut"); - restartSourceHBaseCluster(1); shutDownTargetHBaseCluster(); Put p; @@ -261,34 +290,14 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, rowCountHt2TargetAtPeer1); - // after syun up - for (int i = 0; i < NB_RETRIES; i++) { - syncUp(UTIL1); - rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); - rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); - if (i == NB_RETRIES - 1) { - if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) { - // syncUP still failed. Let's look at the source in case anything wrong there - restartSourceHBaseCluster(1); - rowCount_ht1Source = countRows(ht1Source); - LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = countRows(ht2Source); - LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source); - } - assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, - rowCountHt1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, - rowCountHt2TargetAtPeer1); - } - if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) { - LOG.info("SyncUpAfterPut succeeded at retry = " + i); - break; - } else { - LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCountHt2TargetAtPeer1); - } - Thread.sleep(SLEEP_TIME); - } + syncUp(UTIL1); + + // after sync up + rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100, + rowCountHt1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200, + rowCountHt2TargetAtPeer1); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java index d3142106362..8a28db3b185 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java @@ -136,7 +136,8 @@ public abstract class TestReplicationSyncUpToolBase { } final void syncUp(HBaseTestingUtil util) throws Exception { - ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]); + ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), + new String[0]); } // Utilities that manager shutdown / restart of source / sink clusters. They take care of diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java index b5de8e6324f..e9acc1bc45e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Table; @@ -45,14 +46,11 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileTestUtil; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -//revisit later when we implement the new ReplicationSyncUpTool -@Ignore @Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase { @@ -74,40 +72,50 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication @Test public void testSyncUpTool() throws Exception { - /** - * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: - * 'cf1' : replicated 'norep': not replicated - */ + // Set up Replication: + // on Master and one Slave Table: t1_syncup and t2_syncup + // columnfamily: + // 'cf1' : replicated + // 'norep': not replicated setupReplication(); - /** - * Prepare 24 random hfile ranges required for creating hfiles - */ + // Prepare 24 random hfile ranges required for creating hfiles Iterator randomHFileRangeListIterator = null; Set randomHFileRanges = new HashSet<>(24); for (int i = 0; i < 24; i++) { - randomHFileRanges.add(UTIL1.getRandomUUID().toString()); + randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString()); } List randomHFileRangeList = new ArrayList<>(randomHFileRanges); Collections.sort(randomHFileRangeList); randomHFileRangeListIterator = randomHFileRangeList.iterator(); - /** - * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows - * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 - * rows into norep verify correctly replicated to slave - */ + // at Master: + // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep + // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into + // norep + // verify correctly replicated to slave loadAndReplicateHFiles(true, randomHFileRangeListIterator); - /** - * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load - * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and - * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave - * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step - * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not - * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to - * Slave - */ + // Verify hfile load works + // + // step 1: stop hbase on Slave + // + // step 2: at Master: + // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep + // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep + // + // step 3: stop hbase on master, restart hbase on Slave + // + // step 4: verify Slave still has the rows before load + // t1_syncup: 100 rows from cf1 + // t2_syncup: 200 rows from cf1 + // + // step 5: run syncup tool on Master + // + // step 6: verify that hfiles show up on Slave and 'norep' does not + // t1_syncup: 200 rows from cf1 + // t2_syncup: 400 rows from cf1 + // verify correctly replicated to Slave mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); } @@ -142,34 +150,12 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication syncUp(UTIL1); // After syun up - for (int i = 0; i < NB_RETRIES; i++) { - syncUp(UTIL1); - rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); - rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); - if (i == NB_RETRIES - 1) { - if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) { - // syncUP still failed. Let's look at the source in case anything wrong there - restartSourceHBaseCluster(1); - rowCount_ht1Source = countRows(ht1Source); - LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source); - rowCount_ht2Source = countRows(ht2Source); - LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source); - } - assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, - rowCountHt1TargetAtPeer1); - assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, - rowCountHt2TargetAtPeer1); - } - if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) { - LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i); - break; - } else { - LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" - + rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" - + rowCountHt2TargetAtPeer1); - } - Thread.sleep(SLEEP_TIME); - } + rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); + rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); + assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200, + rowCountHt1TargetAtPeer1); + assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400, + rowCountHt2TargetAtPeer1); } private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java index 4148c1c1a2c..9041831d0e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestTableReplicationQueueStorage.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MD5Hash; import org.apache.hadoop.hbase.util.Pair; import org.apache.zookeeper.KeeperException; @@ -420,4 +421,54 @@ public class TestTableReplicationQueueStorage { assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); assertTrue(storage.getReplicableHFiles(peerId2).isEmpty()); } + + private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2) + throws ReplicationException { + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i)); + } + + List> files1 = new ArrayList<>(3); + files1.add(new Pair<>(null, new Path("file_1"))); + files1.add(new Pair<>(null, new Path("file_2"))); + files1.add(new Pair<>(null, new Path("file_3"))); + storage.addHFileRefs(peerId2, files1); + } + + @Test + public void testRemoveLastSequenceIdsAndHFileRefsBefore() + throws ReplicationException, InterruptedException { + String peerId1 = "1"; + String peerId2 = "2"; + addLastSequenceIdsAndHFileRefs(peerId1, peerId2); + // make sure we have write these out + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1)); + } + assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, storage.getReplicableHFiles(peerId2).size()); + + // should have nothing after removal + long ts = EnvironmentEdgeManager.currentTime(); + storage.removeLastSequenceIdsAndHFileRefsBefore(ts); + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1)); + } + assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); + + Thread.sleep(100); + // add again and remove with the old timestamp + addLastSequenceIdsAndHFileRefs(peerId1, peerId2); + storage.removeLastSequenceIdsAndHFileRefsBefore(ts); + // make sure we do not delete the data which are written after the give timestamp + for (int i = 0; i < 100; i++) { + String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i)); + assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1)); + } + assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size()); + assertEquals(3, storage.getReplicableHFiles(peerId2).size()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java index 28779be4399..83cd41773ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java @@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.experimental.categories.Category; -//revisit later when we implement the new ReplicationSyncUpTool -@Ignore @Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java index f495f433bc9..673b841430e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java @@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.experimental.categories.Category; -//revisit later when we implement the new ReplicationSyncUpTool -@Ignore @Category({ ReplicationTests.class, LargeTests.class }) public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index b7564ed9168..1bb9a3e2949 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -45,11 +45,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -125,8 +122,6 @@ public class TestReplicationSourceManager { private static final TableName TABLE_NAME = TableName.valueOf("test"); - private static TableDescriptor TD; - private static RegionInfo RI; private static NavigableMap SCOPES; @@ -152,10 +147,6 @@ public class TestReplicationSourceManager { FS = UTIL.getTestFileSystem(); CONF = new Configuration(UTIL.getConfiguration()); CONF.setLong("replication.sleep.before.failover", 0); - TD = TableDescriptorBuilder.newBuilder(TABLE_NAME) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(F1) - .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(F2)).build(); RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR); @@ -176,7 +167,8 @@ public class TestReplicationSourceManager { when(server.getConfiguration()).thenReturn(CONF); when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher()); when(server.getConnection()).thenReturn(UTIL.getConnection()); - when(server.getServerName()).thenReturn(ServerName.valueOf("hostname.example.org", 1234, 1)); + ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1); + when(server.getServerName()).thenReturn(sn); oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); FS.mkdirs(oldLogDir); logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); @@ -189,7 +181,7 @@ public class TestReplicationSourceManager { CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); replication = new Replication(); - replication.initialize(server, FS, logDir, oldLogDir, + replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir, new WALFactory(CONF, server.getServerName(), null, false)); manager = replication.getReplicationManager(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java index 1544265435c..8731adbe4c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java @@ -99,8 +99,8 @@ public class TestSerialReplicationChecker { TableName repTable = TableName.valueOf("test_serial_rep"); UTIL.getAdmin() .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable)); - QUEUE_STORAGE = - ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), repTable); + QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), + UTIL.getConfiguration(), repTable); } @AfterClass