HBASE-27216 Revisit the ReplicationSyncUp tool (#4966)

Signed-off-by: Liangjun He <heliangjun@apache.org>
This commit is contained in:
Duo Zhang 2023-03-18 21:38:53 +08:00 committed by Duo Zhang
parent 7c74f9e8c5
commit 000e64abda
22 changed files with 1029 additions and 293 deletions

View File

@ -40,4 +40,8 @@ public final class JsonMapper {
public static String writeObjectAsString(Object object) throws IOException { public static String writeObjectAsString(Object object) throws IOException {
return GSON.toJson(object); return GSON.toJson(object);
} }
public static <T> T fromJson(String json, Class<T> clazz) {
return GSON.fromJson(json, clazz);
}
} }

View File

@ -717,6 +717,7 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
enum AssignReplicationQueuesState { enum AssignReplicationQueuesState {
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1; ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
ASSIGN_REPLICATION_QUEUES_CLAIM = 2; ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES = 3;
} }
message AssignReplicationQueuesStateData { message AssignReplicationQueuesStateData {

View File

@ -203,4 +203,25 @@ public interface ReplicationQueueStorage {
* Add the given hfile refs to the given peer. * Add the given hfile refs to the given peer.
*/ */
void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException; void batchUpdateHFileRefs(String peerId, List<String> hfileRefs) throws ReplicationException;
// the below method is for clean up stale data after running ReplicatoinSyncUp
/**
* Remove all the last sequence ids and hfile references data which are written before the given
* timestamp.
* <p/>
* The data of these two types are not used by replication directly.
* <p/>
* For last sequence ids, we will check it in serial replication, to make sure that we will
* replicate all edits in order, so if there are stale data, the worst case is that we will stop
* replicating as we think we still need to finish previous ranges first, although actually we
* have already replicated them out.
* <p/>
* For hfile references, it is just used by hfile cleaner to not remove these hfiles before we
* replicate them out, so if there are stale data, the worst case is that we can not remove these
* hfiles, although actually they have already been replicated out.
* <p/>
* So it is OK for us to just bring up the cluster first, and then use this method to delete the
* stale data, i.e, the data which are written before a specific timestamp.
*/
void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException;
} }

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* Used to create replication storage(peer, queue) classes. * Used to create replication storage(peer, queue) classes.
@ -39,6 +41,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private @InterfaceAudience.Private
public final class ReplicationStorageFactory { public final class ReplicationStorageFactory {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class);
public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl"; public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl";
// must use zookeeper here, otherwise when user upgrading from an old version without changing the // must use zookeeper here, otherwise when user upgrading from an old version without changing the
@ -51,6 +55,8 @@ public final class ReplicationStorageFactory {
public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT = public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.storage.impl";
public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName) public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName)
throws IOException { throws IOException {
return TableDescriptorBuilder.newBuilder(tableName) return TableDescriptorBuilder.newBuilder(tableName)
@ -108,15 +114,26 @@ public final class ReplicationStorageFactory {
*/ */
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
Configuration conf) { Configuration conf) {
return getReplicationQueueStorage(conn, TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME, return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()))); .get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())));
} }
/** /**
* Create a new {@link ReplicationQueueStorage}. * Create a new {@link ReplicationQueueStorage}.
*/ */
public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn,
TableName tableName) { Configuration conf, TableName tableName) {
return new TableReplicationQueueStorage(conn, tableName); Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL,
TableReplicationQueueStorage.class, ReplicationQueueStorage.class);
try {
Constructor<? extends ReplicationQueueStorage> c =
clazz.getConstructor(Connection.class, TableName.class);
return c.newInstance(conn, tableName);
} catch (Exception e) {
LOG.debug(
"failed to create ReplicationQueueStorage with Connection, try creating with Configuration",
e);
return ReflectionUtils.newInstance(clazz, conf, tableName);
}
} }
} }

View File

@ -594,4 +594,24 @@ public class TableReplicationQueueStorage implements ReplicationQueueStorage {
throw new ReplicationException("failed to batch update hfile references", e); throw new ReplicationException("failed to batch update hfile references", e);
} }
} }
@Override
public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
try (Table table = conn.getTable(tableName);
ResultScanner scanner = table.getScanner(new Scan().addFamily(LAST_SEQUENCE_ID_FAMILY)
.addFamily(HFILE_REF_FAMILY).setFilter(new KeyOnlyFilter()))) {
for (;;) {
Result r = scanner.next();
if (r == null) {
break;
}
Delete delete = new Delete(r.getRow()).addFamily(LAST_SEQUENCE_ID_FAMILY, ts)
.addFamily(HFILE_REF_FAMILY, ts);
table.delete(delete);
}
} catch (IOException e) {
throw new ReplicationException(
"failed to remove last sequence ids and hfile references before timestamp " + ts, e);
}
}
} }

View File

@ -34,6 +34,9 @@ import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -59,6 +62,7 @@ import java.util.stream.Collectors;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CatalogFamilyFormat; import org.apache.hadoop.hbase.CatalogFamilyFormat;
@ -226,6 +230,8 @@ import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint; import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
@ -246,6 +252,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JsonMapper;
import org.apache.hadoop.hbase.util.ModifyRegionUtils; import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounter;
@ -267,7 +274,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.gson.JsonParseException;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Service; import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server; import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
@ -1286,6 +1295,38 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
status.setStatus("Initializing MOB Cleaner"); status.setStatus("Initializing MOB Cleaner");
initMobCleaner(); initMobCleaner();
// delete the stale data for replication sync up tool if necessary
status.setStatus("Cleanup ReplicationSyncUp status if necessary");
Path replicationSyncUpInfoFile =
new Path(new Path(dataRootDir, ReplicationSyncUp.INFO_DIR), ReplicationSyncUp.INFO_FILE);
if (dataFs.exists(replicationSyncUpInfoFile)) {
// info file is available, load the timestamp and use it to clean up stale data in replication
// queue storage.
byte[] data;
try (FSDataInputStream in = dataFs.open(replicationSyncUpInfoFile)) {
data = ByteStreams.toByteArray(in);
}
ReplicationSyncUpToolInfo info = null;
try {
info = JsonMapper.fromJson(Bytes.toString(data), ReplicationSyncUpToolInfo.class);
} catch (JsonParseException e) {
// usually this should be a partial file, which means the ReplicationSyncUp tool did not
// finish properly, so not a problem. Here we do not clean up the status as we do not know
// the reason why the tool did not finish properly, so let users clean the status up
// manually
LOG.warn("failed to parse replication sync up info file, ignore and continue...", e);
}
if (info != null) {
LOG.info("Remove last sequence ids and hfile references which are written before {}({})",
info.getStartTimeMs(), DateTimeFormatter.ISO_DATE_TIME.withZone(ZoneId.systemDefault())
.format(Instant.ofEpochMilli(info.getStartTimeMs())));
replicationPeerManager.getQueueStorage()
.removeLastSequenceIdsAndHFileRefsBefore(info.getStartTimeMs());
// delete the file after removing the stale data, so next time we do not need to do this
// again.
dataFs.delete(replicationSyncUpInfoFile, false);
}
}
status.setStatus("Calling postStartMaster coprocessors"); status.setStatus("Calling postStartMaster coprocessors");
if (this.cpHost != null) { if (this.cpHost != null) {
// don't let cp initialization errors kill the master // don't let cp initialization errors kill the master

View File

@ -24,7 +24,9 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -102,7 +105,7 @@ public class AssignReplicationQueuesProcedure
} }
} }
private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException { private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream() Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet()); .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
@ -130,18 +133,51 @@ public class AssignReplicationQueuesProcedure
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
} }
// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
// claiming the replication queues and deleting them instead.
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
return mfs.getFileSystem().exists(new Path(syncUpDir, crashedServer.getServerName()));
}
private void removeQueues(MasterProcedureEnv env) throws ReplicationException, IOException {
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
for (ReplicationQueueId queueId : storage.listAllQueueIds(crashedServer)) {
storage.removeQueue(queueId);
}
MasterFileSystem mfs = env.getMasterFileSystem();
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
// remove the region server record file
mfs.getFileSystem().delete(new Path(syncUpDir, crashedServer.getServerName()), false);
}
@Override @Override
protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state) protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try { try {
switch (state) { switch (state) {
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES: case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
addMissingQueues(env); if (shouldSkip(env)) {
retryCounter = null; setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM); return Flow.HAS_MORE_STATE;
return Flow.HAS_MORE_STATE; } else {
addMissingQueues(env);
retryCounter = null;
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_CLAIM);
return Flow.HAS_MORE_STATE;
}
case ASSIGN_REPLICATION_QUEUES_CLAIM: case ASSIGN_REPLICATION_QUEUES_CLAIM:
return claimQueues(env); if (shouldSkip(env)) {
retryCounter = null;
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES);
return Flow.HAS_MORE_STATE;
} else {
return claimQueues(env);
}
case ASSIGN_REPLICATION_QUEUES_REMOVE_QUEUES:
removeQueues(env);
return Flow.NO_MORE_STATE;
default: default:
throw new UnsupportedOperationException("unhandled state=" + state); throw new UnsupportedOperationException("unhandled state=" + state);
} }

View File

@ -19,16 +19,22 @@ package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -54,6 +60,32 @@ public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
this.targetServer = targetServer; this.targetServer = targetServer;
} }
// check whether ReplicationSyncUp has already done the work for us, if so, we should skip
// claiming the replication queues and deleting them instead.
private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
}
@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
try {
if (shouldSkip(env)) {
LOG.info("Skip claiming {} because replication sync up has already done it for us",
getServerName());
return null;
}
} catch (IOException e) {
LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
getServerName(), e);
// just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
return null;
}
return super.execute(env);
}
@Override @Override
public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) { public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
assert targetServer.equals(remote); assert targetServer.equals(remote);

View File

@ -0,0 +1,382 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@InterfaceAudience.Private
public class OfflineTableReplicationQueueStorage implements ReplicationQueueStorage {
private final Map<ReplicationQueueId, Map<String, ReplicationGroupOffset>> offsets =
new HashMap<>();
private final Map<String, Map<String, Long>> lastSequenceIds = new HashMap<>();
private final Map<String, Set<String>> hfileRefs = new HashMap<>();
private void loadRegionInfo(FileSystem fs, Path regionDir,
NavigableMap<byte[], RegionInfo> startKey2RegionInfo) throws IOException {
RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir);
// TODO: we consider that the there will not be too many regions for hbase:replication table, so
// here we just iterate over all the regions to find out the overlapped ones. Can be optimized
// later.
Iterator<Map.Entry<byte[], RegionInfo>> iter = startKey2RegionInfo.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<byte[], RegionInfo> entry = iter.next();
if (hri.isOverlap(entry.getValue())) {
if (hri.getRegionId() > entry.getValue().getRegionId()) {
// we are newer, remove the old hri, we can not break here as if hri is a merged region,
// we need to remove all its parent regions.
iter.remove();
} else {
// we are older, just return, skip the below add
return;
}
}
}
startKey2RegionInfo.put(hri.getStartKey(), hri);
}
private void loadOffsets(Result result) {
NavigableMap<byte[], byte[]> map =
result.getFamilyMap(TableReplicationQueueStorage.QUEUE_FAMILY);
if (map == null || map.isEmpty()) {
return;
}
Map<String, ReplicationGroupOffset> offsetMap = new HashMap<>();
map.forEach((k, v) -> {
String walGroup = Bytes.toString(k);
ReplicationGroupOffset offset = ReplicationGroupOffset.parse(Bytes.toString(v));
offsetMap.put(walGroup, offset);
});
ReplicationQueueId queueId = ReplicationQueueId.parse(Bytes.toString(result.getRow()));
offsets.put(queueId, offsetMap);
}
private void loadLastSequenceIds(Result result) {
NavigableMap<byte[], byte[]> map =
result.getFamilyMap(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY);
if (map == null || map.isEmpty()) {
return;
}
Map<String, Long> lastSeqIdMap = new HashMap<>();
map.forEach((k, v) -> {
String encodedRegionName = Bytes.toString(k);
long lastSeqId = Bytes.toLong(v);
lastSeqIdMap.put(encodedRegionName, lastSeqId);
});
String peerId = Bytes.toString(result.getRow());
lastSequenceIds.put(peerId, lastSeqIdMap);
}
private void loadHFileRefs(Result result) {
NavigableMap<byte[], byte[]> map =
result.getFamilyMap(TableReplicationQueueStorage.HFILE_REF_FAMILY);
if (map == null || map.isEmpty()) {
return;
}
Set<String> refs = new HashSet<>();
map.keySet().forEach(ref -> refs.add(Bytes.toString(ref)));
String peerId = Bytes.toString(result.getRow());
hfileRefs.put(peerId, refs);
}
private void loadReplicationQueueData(Configuration conf, TableName tableName)
throws IOException {
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, tableName);
FileSystem fs = tableDir.getFileSystem(conf);
FileStatus[] regionDirs =
CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs));
if (regionDirs == null) {
return;
}
NavigableMap<byte[], RegionInfo> startKey2RegionInfo = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (FileStatus regionDir : regionDirs) {
loadRegionInfo(fs, regionDir.getPath(), startKey2RegionInfo);
}
TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
for (RegionInfo hri : startKey2RegionInfo.values()) {
try (ClientSideRegionScanner scanner =
new ClientSideRegionScanner(conf, fs, rootDir, td, hri, new Scan(), null)) {
for (;;) {
Result result = scanner.next();
if (result == null) {
break;
}
loadOffsets(result);
loadLastSequenceIds(result);
loadHFileRefs(result);
}
}
}
}
public OfflineTableReplicationQueueStorage(Configuration conf, TableName tableName)
throws IOException {
loadReplicationQueueData(conf, tableName);
}
@Override
public synchronized void setOffset(ReplicationQueueId queueId, String walGroup,
ReplicationGroupOffset offset, Map<String, Long> lastSeqIds) throws ReplicationException {
Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
if (offsetMap == null) {
offsetMap = new HashMap<>();
offsets.put(queueId, offsetMap);
}
offsetMap.put(walGroup, offset);
Map<String, Long> lastSeqIdsMap = lastSequenceIds.get(queueId.getPeerId());
if (lastSeqIdsMap == null) {
lastSeqIdsMap = new HashMap<>();
lastSequenceIds.put(queueId.getPeerId(), lastSeqIdsMap);
}
for (Map.Entry<String, Long> entry : lastSeqIds.entrySet()) {
Long oldSeqId = lastSeqIdsMap.get(entry.getKey());
if (oldSeqId == null || oldSeqId < entry.getValue()) {
lastSeqIdsMap.put(entry.getKey(), entry.getValue());
}
}
}
@Override
public synchronized Map<String, ReplicationGroupOffset> getOffsets(ReplicationQueueId queueId)
throws ReplicationException {
Map<String, ReplicationGroupOffset> offsetMap = offsets.get(queueId);
if (offsetMap == null) {
return Collections.emptyMap();
}
return ImmutableMap.copyOf(offsetMap);
}
@Override
public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId)
throws ReplicationException {
return offsets.keySet().stream().filter(rqi -> rqi.getPeerId().equals(peerId))
.collect(Collectors.toList());
}
@Override
public synchronized List<ReplicationQueueId> listAllQueueIds(ServerName serverName)
throws ReplicationException {
return offsets.keySet().stream().filter(rqi -> rqi.getServerName().equals(serverName))
.collect(Collectors.toList());
}
@Override
public synchronized List<ReplicationQueueId> listAllQueueIds(String peerId, ServerName serverName)
throws ReplicationException {
return offsets.keySet().stream()
.filter(rqi -> rqi.getPeerId().equals(peerId) && rqi.getServerName().equals(serverName))
.collect(Collectors.toList());
}
@Override
public synchronized List<ReplicationQueueData> listAllQueues() throws ReplicationException {
return offsets.entrySet().stream()
.map(e -> new ReplicationQueueData(e.getKey(), ImmutableMap.copyOf(e.getValue())))
.collect(Collectors.toList());
}
@Override
public synchronized List<ServerName> listAllReplicators() throws ReplicationException {
return offsets.keySet().stream().map(ReplicationQueueId::getServerName).distinct()
.collect(Collectors.toList());
}
@Override
public synchronized Map<String, ReplicationGroupOffset> claimQueue(ReplicationQueueId queueId,
ServerName targetServerName) throws ReplicationException {
Map<String, ReplicationGroupOffset> offsetMap = offsets.remove(queueId);
if (offsetMap == null) {
return Collections.emptyMap();
}
offsets.put(queueId.claim(targetServerName), offsetMap);
return ImmutableMap.copyOf(offsetMap);
}
@Override
public synchronized void removeQueue(ReplicationQueueId queueId) throws ReplicationException {
offsets.remove(queueId);
}
@Override
public synchronized void removeAllQueues(String peerId) throws ReplicationException {
Iterator<ReplicationQueueId> iter = offsets.keySet().iterator();
while (iter.hasNext()) {
if (iter.next().getPeerId().equals(peerId)) {
iter.remove();
}
}
}
@Override
public synchronized long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
if (lastSeqIdMap == null) {
return HConstants.NO_SEQNUM;
}
Long lastSeqId = lastSeqIdMap.get(encodedRegionName);
return lastSeqId != null ? lastSeqId.longValue() : HConstants.NO_SEQNUM;
}
@Override
public synchronized void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {
Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
if (lastSeqIdMap == null) {
lastSeqIdMap = new HashMap<>();
lastSequenceIds.put(peerId, lastSeqIdMap);
}
lastSeqIdMap.putAll(lastSeqIds);
}
@Override
public synchronized void removeLastSequenceIds(String peerId) throws ReplicationException {
lastSequenceIds.remove(peerId);
}
@Override
public synchronized void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
throws ReplicationException {
Map<String, Long> lastSeqIdMap = lastSequenceIds.get(peerId);
if (lastSeqIdMap == null) {
return;
}
for (String encodedRegionName : encodedRegionNames) {
lastSeqIdMap.remove(encodedRegionName);
}
}
@Override
public synchronized void removePeerFromHFileRefs(String peerId) throws ReplicationException {
hfileRefs.remove(peerId);
}
@Override
public synchronized void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
Set<String> refs = hfileRefs.get(peerId);
if (refs == null) {
refs = new HashSet<>();
hfileRefs.put(peerId, refs);
}
for (Pair<Path, Path> pair : pairs) {
refs.add(pair.getSecond().getName());
}
}
@Override
public synchronized void removeHFileRefs(String peerId, List<String> files)
throws ReplicationException {
Set<String> refs = hfileRefs.get(peerId);
if (refs == null) {
return;
}
refs.removeAll(files);
}
@Override
public synchronized List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
return ImmutableList.copyOf(hfileRefs.keySet());
}
@Override
public synchronized List<String> getReplicableHFiles(String peerId) throws ReplicationException {
Set<String> refs = hfileRefs.get(peerId);
if (refs == null) {
return Collections.emptyList();
}
return ImmutableList.copyOf(refs);
}
@Override
public synchronized Set<String> getAllHFileRefs() throws ReplicationException {
return hfileRefs.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
}
@Override
public boolean hasData() throws ReplicationException {
return true;
}
@Override
public void batchUpdateQueues(ServerName serverName, List<ReplicationQueueData> datas)
throws ReplicationException {
throw new UnsupportedOperationException();
}
@Override
public void batchUpdateLastSequenceIds(
List<ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId> lastPushedSeqIds)
throws ReplicationException {
throw new UnsupportedOperationException();
}
@Override
public void batchUpdateHFileRefs(String peerId, List<String> hfileRefs)
throws ReplicationException {
throw new UnsupportedOperationException();
}
@Override
public void removeLastSequenceIdsAndHFileRefsBefore(long ts) throws ReplicationException {
throw new UnsupportedOperationException();
}
}

View File

@ -654,7 +654,7 @@ public class ReplicationPeerManager implements ConfigurationObserver {
}; };
} }
return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage( return Pair.newPair(ReplicationStorageFactory.getReplicationQueueStorage(
services.getConnection(), replicationQueueTableName), initializer); services.getConnection(), conf, replicationQueueTableName), initializer);
} }
public static ReplicationPeerManager create(MasterServices services, String clusterId) public static ReplicationPeerManager create(MasterServices services, String clusterId)

View File

@ -25,7 +25,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -126,6 +125,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationSourceManager { public class ReplicationSourceManager {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
// all the sources that read this RS's logs and every peer only has one replication source // all the sources that read this RS's logs and every peer only has one replication source
private final ConcurrentMap<String, ReplicationSourceInterface> sources; private final ConcurrentMap<String, ReplicationSourceInterface> sources;
@ -147,13 +147,15 @@ public class ReplicationSourceManager {
// All logs we are currently tracking // All logs we are currently tracking
// Index structure of the map is: queue_id->logPrefix/logGroup->logs // Index structure of the map is: queue_id->logPrefix/logGroup->logs
// For normal replication source, the peer id is same with the queue id
private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById; private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById;
// Logs for recovered sources we are currently tracking // Logs for recovered sources we are currently tracking
// the map is: queue_id->logPrefix/logGroup->logs // the map is: queue_id->logPrefix/logGroup->logs
// For recovered source, the queue id's format is peer_id-servername-* // for recovered source, the WAL files should already been moved to oldLogDir, and we have
// different layout of old WAL files, for example, with server name sub directories or not, so
// here we record the full path instead of just the name, so when refreshing we can enqueue the
// WAL file again, without trying to guess the real path of the WAL files.
private final ConcurrentMap<ReplicationQueueId, private final ConcurrentMap<ReplicationQueueId,
Map<String, NavigableSet<String>>> walsByIdRecoveredQueues; Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues;
private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager; private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
@ -515,9 +517,9 @@ public class ReplicationSourceManager {
ReplicationSourceInterface recoveredReplicationSource = ReplicationSourceInterface recoveredReplicationSource =
createRefreshedSource(oldSourceQueueId, peer); createRefreshedSource(oldSourceQueueId, peer);
this.oldsources.add(recoveredReplicationSource); this.oldsources.add(recoveredReplicationSource);
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId) for (NavigableSet<Path> walsByGroup : walsByIdRecoveredQueues.get(oldSourceQueueId)
.values()) { .values()) {
walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal))); walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(wal));
} }
toStartup.add(recoveredReplicationSource); toStartup.add(recoveredReplicationSource);
} }
@ -657,9 +659,11 @@ public class ReplicationSourceManager {
void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) { void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log); String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (source.isRecovered()) { if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix); NavigableSet<Path> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) { if (wals != null) {
NavigableSet<String> walsToRemove = wals.headSet(log, inclusive); // here we just want to compare the timestamp, so it is OK to just create a fake WAL path
NavigableSet<String> walsToRemove = wals.headSet(new Path(oldLogDir, log), inclusive)
.stream().map(Path::getName).collect(Collectors.toCollection(TreeSet::new));
if (walsToRemove.isEmpty()) { if (walsToRemove.isEmpty()) {
return; return;
} }
@ -815,6 +819,93 @@ public class ReplicationSourceManager {
} }
void claimQueue(ReplicationQueueId queueId) { void claimQueue(ReplicationQueueId queueId) {
claimQueue(queueId, false);
}
// sorted from oldest to newest
private PriorityQueue<Path> getWALFilesToReplicate(ServerName sourceRS, boolean syncUp,
Map<String, ReplicationGroupOffset> offsets) throws IOException {
List<Path> walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS,
URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
if (syncUp) {
// we also need to list WALs directory for ReplicationSyncUp
walFiles.addAll(AbstractFSWALProvider.getWALFiles(conf, sourceRS));
}
PriorityQueue<Path> walFilesPQ =
new PriorityQueue<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
// sort the wal files and also filter out replicated files
for (Path file : walFiles) {
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
if (shouldReplicate(groupOffset, file.getName())) {
walFilesPQ.add(file);
} else {
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
groupOffset);
}
}
return walFilesPQ;
}
private void addRecoveredSource(ReplicationSourceInterface src, ReplicationPeerImpl oldPeer,
ReplicationQueueId claimedQueueId, PriorityQueue<Path> walFiles) {
ReplicationPeerImpl peer = replicationPeers.getPeer(src.getPeerId());
if (peer == null || peer != oldPeer) {
src.terminate("Recovered queue doesn't belong to any current peer");
deleteQueue(claimedQueueId);
return;
}
// Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
// transiting to STANDBY state. The only exception is we are in STANDBY state and
// transiting to DA, under this state we will replay the remote WAL and they need to be
// replicated back.
if (peer.getPeerConfig().isSyncReplication()) {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
peer.getSyncReplicationStateAndNewState();
if (
(stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
&& stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
|| stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
) {
src.terminate("Sync replication peer is in STANDBY state");
deleteQueue(claimedQueueId);
return;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<Path>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
for (Path wal : walFiles) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
NavigableSet<Path> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
walsByGroup.put(walPrefix, wals);
}
wals.add(wal);
}
oldsources.add(src);
LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", claimedQueueId,
walFiles.size());
for (Path wal : walFiles) {
LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
src.enqueueLog(wal);
}
src.startup();
}
/**
* Claim a replication queue.
* <p/>
* We add a flag to indicate whether we are called by ReplicationSyncUp. For normal claiming queue
* operation, we are the last step of a SCP, so we can assume that all the WAL files are under
* oldWALs directory. But for ReplicationSyncUp, we may want to claim the replication queue for a
* region server which has not been processed by SCP yet, so we still need to look at its WALs
* directory.
* @param queueId the replication queue id we want to claim
* @param syncUp whether we are called by ReplicationSyncUp
*/
void claimQueue(ReplicationQueueId queueId, boolean syncUp) {
// Wait a bit before transferring the queues, we may be shutting down. // Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases. // This sleep may not be enough in some cases.
try { try {
@ -873,76 +964,17 @@ public class ReplicationSourceManager {
server.abort("Failed to create replication source after claiming queue.", e); server.abort("Failed to create replication source after claiming queue.", e);
return; return;
} }
List<Path> walFiles; PriorityQueue<Path> walFiles;
try { try {
walFiles = AbstractFSWALProvider.getArchivedWALFiles(conf, sourceRS, walFiles = getWALFilesToReplicate(sourceRS, syncUp, offsets);
URLEncoder.encode(sourceRS.toString(), StandardCharsets.UTF_8.name()));
} catch (IOException e) { } catch (IOException e) {
LOG.error("Can not list all wal files for peer {} and queue {}", peerId, queueId, e); LOG.error("Can not list wal files for peer {} and queue {}", peerId, queueId, e);
server.abort("Can not list all wal files after claiming queue.", e); server.abort("Can not list wal files after claiming queue.", e);
return; return;
} }
PriorityQueue<Path> walFilesPQ = new PriorityQueue<>(
Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName()))
.thenComparing(Path::getName));
// sort the wal files and also filter out replicated files
for (Path file : walFiles) {
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getName());
ReplicationGroupOffset groupOffset = offsets.get(walGroupId);
if (shouldReplicate(groupOffset, file.getName())) {
walFilesPQ.add(file);
} else {
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
groupOffset);
}
}
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
// mistake, we should use the sorted walFilesPQ instead
walFiles = null;
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
synchronized (oldsources) { synchronized (oldsources) {
peer = replicationPeers.getPeer(src.getPeerId()); addRecoveredSource(src, oldPeer, claimedQueueId, walFiles);
if (peer == null || peer != oldPeer) {
src.terminate("Recovered queue doesn't belong to any current peer");
deleteQueue(claimedQueueId);
return;
}
// Do not setup recovered queue if a sync replication peer is in STANDBY state, or is
// transiting to STANDBY state. The only exception is we are in STANDBY state and
// transiting to DA, under this state we will replay the remote WAL and they need to be
// replicated back.
if (peer.getPeerConfig().isSyncReplication()) {
Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
peer.getSyncReplicationStateAndNewState();
if (
(stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)
&& stateAndNewState.getSecond().equals(SyncReplicationState.NONE))
|| stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)
) {
src.terminate("Sync replication peer is in STANDBY state");
deleteQueue(claimedQueueId);
return;
}
}
// track sources in walsByIdRecoveredQueues
Map<String, NavigableSet<String>> walsByGroup = new HashMap<>();
walsByIdRecoveredQueues.put(claimedQueueId, walsByGroup);
for (Path wal : walFilesPQ) {
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
NavigableSet<String> wals = walsByGroup.get(walPrefix);
if (wals == null) {
wals = new TreeSet<>();
walsByGroup.put(walPrefix, wals);
}
wals.add(wal.getName());
}
oldsources.add(src);
LOG.info("Added source for recovered queue {}", claimedQueueId);
for (Path wal : walFilesPQ) {
LOG.debug("Enqueueing log {} from recovered queue for source: {}", wal, claimedQueueId);
src.enqueueLog(new Path(oldLogDir, wal));
}
src.startup();
} }
} }
@ -971,16 +1003,6 @@ public class ReplicationSourceManager {
return Collections.unmodifiableMap(walsById); return Collections.unmodifiableMap(walsById);
} }
/**
* Get a copy of the wals of the recovered sources on this rs
* @return a sorted set of wal names
*/
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWalsByIdRecoveredQueues() {
return Collections.unmodifiableMap(walsByIdRecoveredQueues);
}
/** /**
* Get a list of all the normal sources of this rs * Get a list of all the normal sources of this rs
* @return list of all normal sources * @return list of all normal sources
@ -1100,8 +1122,6 @@ public class ReplicationSourceManager {
return this.globalMetrics; return this.globalMetrics;
} }
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
ReplicationQueueStorage getQueueStorage() { ReplicationQueueStorage getQueueStorage() {
return queueStorage; return queueStorage;
} }

View File

@ -17,13 +17,17 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
@ -35,11 +39,18 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JsonMapper;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -59,6 +70,31 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationSyncUp extends Configured implements Tool { public class ReplicationSyncUp extends Configured implements Tool {
public static class ReplicationSyncUpToolInfo {
private long startTimeMs;
public ReplicationSyncUpToolInfo() {
}
public ReplicationSyncUpToolInfo(long startTimeMs) {
this.startTimeMs = startTimeMs;
}
public long getStartTimeMs() {
return startTimeMs;
}
public void setStartTimeMs(long startTimeMs) {
this.startTimeMs = startTimeMs;
}
}
// For storing the information used to skip replicating some wals after the cluster is back online
public static final String INFO_DIR = "ReplicationSyncUp";
public static final String INFO_FILE = "info";
private static final long SLEEP_TIME = 10000; private static final long SLEEP_TIME = 10000;
/** /**
@ -69,41 +105,116 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.exit(ret); System.exit(ret);
} }
private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws KeeperException { // Find region servers under wal directory
List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); // Here we only care about the region servers which may still be alive, as we need to add
return rsZNodes == null // replications for them if missing. The dead region servers which have already been processed
? Collections.emptySet() // fully do not need to add their replication queues again, as the operation has already been done
: rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet()); // in SCP.
private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException {
FileStatus[] statuses;
try {
statuses = walFs.listStatus(walDir);
} catch (FileNotFoundException e) {
System.out.println("WAL directory " + walDir + " does not exists, ignore");
return Collections.emptySet();
}
Set<ServerName> regionServers = new HashSet<>();
for (FileStatus status : statuses) {
// All wal files under the walDir is within its region server's directory
if (!status.isDirectory()) {
continue;
}
ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath());
if (sn != null) {
regionServers.add(sn);
}
}
return regionServers;
}
private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer,
Set<String> peerIds) throws ReplicationException {
Set<String> existingQueuePeerIds = new HashSet<>();
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer);
for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) {
ReplicationQueueId queueId = iter.next();
if (!queueId.isRecovered()) {
existingQueuePeerIds.add(queueId.getPeerId());
}
}
for (String peerId : peerIds) {
if (!existingQueuePeerIds.contains(peerId)) {
ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId);
System.out.println("Add replication queue " + queueId + " for claiming");
storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN,
Collections.emptyMap());
}
}
}
private void addMissingReplicationQueues(ReplicationQueueStorage storage,
Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException {
for (ServerName regionServer : regionServers) {
addMissingReplicationQueues(storage, regionServer, peerIds);
}
} }
// When using this tool, usually the source cluster is unhealthy, so we should try to claim the // When using this tool, usually the source cluster is unhealthy, so we should try to claim the
// replication queues for the dead region servers first and then replicate the data out. // replication queues for the dead region servers first and then replicate the data out.
private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr) private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers)
throws ReplicationException, KeeperException { throws ReplicationException, KeeperException, IOException {
// TODO: reimplement this tool // union the region servers from both places, i.e, from the wal directory, and the records in
// List<ServerName> replicators = mgr.getQueueStorage().getListOfReplicators(); // replication queue storage.
// Set<ServerName> liveRegionServers = getLiveRegionServers(zkw); Set<ServerName> replicators = new HashSet<>(regionServers);
// for (ServerName sn : replicators) { ReplicationQueueStorage queueStorage = mgr.getQueueStorage();
// if (!liveRegionServers.contains(sn)) { replicators.addAll(queueStorage.listAllReplicators());
// List<String> replicationQueues = mgr.getQueueStorage().getAllQueues(sn); FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
// System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
// for (String queue : replicationQueues) { for (ServerName sn : replicators) {
// mgr.claimQueue(sn, queue); List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn);
// } System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
// } // record the rs name, so when master restarting, we will skip claiming its replication queue
// } fs.createNewFile(new Path(infoDir, sn.getServerName()));
for (ReplicationQueueId queueId : replicationQueues) {
mgr.claimQueue(queueId, true);
}
}
}
private void writeInfoFile(FileSystem fs) throws IOException {
// Record the info of this run. Currently only record the time we run the job. We will use this
// timestamp to clean up the data for last sequence ids and hfile refs in replication queue
// storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
ReplicationSyncUpToolInfo info =
new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
String json = JsonMapper.writeObjectAsString(info);
Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
out.write(Bytes.toBytes(json));
}
} }
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
Abortable abortable = new Abortable() { Abortable abortable = new Abortable() {
private volatile boolean abort = false;
@Override @Override
public void abort(String why, Throwable e) { public void abort(String why, Throwable e) {
if (isAborted()) {
return;
}
abort = true;
System.err.println("Aborting because of " + why);
e.printStackTrace();
System.exit(1);
} }
@Override @Override
public boolean isAborted() { public boolean isAborted() {
return false; return abort;
} }
}; };
Configuration conf = getConf(); Configuration conf = getConf();
@ -114,16 +225,24 @@ public class ReplicationSyncUp extends Configured implements Tool {
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
System.out.println("Start Replication Server start"); System.out.println("Start Replication Server");
writeInfoFile(fs);
Replication replication = new Replication(); Replication replication = new Replication();
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, // use offline table replication queue storage
getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class);
DummyServer server = new DummyServer(getConf(), zkw);
replication.initialize(server, fs, new Path(logDir, server.toString()), oldLogDir,
new WALFactory(conf, new WALFactory(conf,
ServerName ServerName
.valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()), .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
null, false)); null, false));
ReplicationSourceManager manager = replication.getReplicationManager(); ReplicationSourceManager manager = replication.getReplicationManager();
manager.init(); manager.init();
claimReplicationQueues(zkw, manager); Set<ServerName> regionServers = listRegionServers(fs, logDir);
addMissingReplicationQueues(manager.getQueueStorage(), regionServers,
manager.getReplicationPeers().getAllPeerIds());
claimReplicationQueues(manager, regionServers);
while (manager.activeFailoverTaskCount() > 0) { while (manager.activeFailoverTaskCount() > 0) {
Thread.sleep(SLEEP_TIME); Thread.sleep(SLEEP_TIME);
} }
@ -138,23 +257,22 @@ public class ReplicationSyncUp extends Configured implements Tool {
return 0; return 0;
} }
class DummyServer implements Server { private static final class DummyServer implements Server {
String hostname; private final Configuration conf;
ZKWatcher zkw; private final String hostname;
private final ZKWatcher zkw;
private volatile boolean abort = false;
DummyServer(ZKWatcher zkw) { DummyServer(Configuration conf, ZKWatcher zkw) {
// a unique name in case the first run fails // a unique name in case the first run fails
hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org"; hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org";
this.conf = conf;
this.zkw = zkw; this.zkw = zkw;
} }
DummyServer(String hostname) {
this.hostname = hostname;
}
@Override @Override
public Configuration getConfiguration() { public Configuration getConfiguration() {
return getConf(); return conf;
} }
@Override @Override
@ -174,11 +292,18 @@ public class ReplicationSyncUp extends Configured implements Tool {
@Override @Override
public void abort(String why, Throwable e) { public void abort(String why, Throwable e) {
if (isAborted()) {
return;
}
abort = true;
System.err.println("Aborting because of " + why);
e.printStackTrace();
System.exit(1);
} }
@Override @Override
public boolean isAborted() { public boolean isAborted() {
return false; return abort;
} }
@Override @Override

View File

@ -311,6 +311,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return matcher.matches() ? Long.parseLong(matcher.group(2)) : NO_TIMESTAMP; return matcher.matches() ? Long.parseLong(matcher.group(2)) : NO_TIMESTAMP;
} }
public static final Comparator<Path> TIMESTAMP_COMPARATOR =
Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getName()))
.thenComparing(Path::getName);
/** /**
* Construct the directory name for all WALs on a given server. Dir names currently look like this * Construct the directory name for all WALs on a given server. Dir names currently look like this
* for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>. * for WALs: <code>hbase//WALs/kalashnikov.att.net,61634,1486865297088</code>.

View File

@ -127,8 +127,8 @@ public class TestLogsCleaner {
TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
TEST_UTIL.getAdmin().createTable(td); TEST_UTIL.getAdmin().createTable(td);
TEST_UTIL.waitTableAvailable(tableName); TEST_UTIL.waitTableAvailable(tableName);
queueStorage = queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(),
ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName); conf, tableName);
masterServices = mock(MasterServices.class); masterServices = mock(MasterServices.class);
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());

View File

@ -22,24 +22,28 @@ import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES
import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH; import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME; import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
// revisit later when we implement the new ReplicationSyncUpTool
@Ignore
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase { public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
@ -55,39 +59,70 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
*/ */
@Test @Test
public void testSyncUpTool() throws Exception { public void testSyncUpTool() throws Exception {
// Set up Replication: on Master and one Slave
/** // Table: t1_syncup and t2_syncup
* Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: // columnfamily:
* 'cf1' : replicated 'norep': not replicated // 'cf1' : replicated
*/ // 'norep': not replicated
setupReplication(); setupReplication();
/** //
* at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows // at Master:
* into cf1, and 1 rows into norep verify correctly replicated to slave // t1_syncup: put 100 rows into cf1, and 1 rows into norep
*/ // t2_syncup: put 200 rows into cf1, and 1 rows into norep
//
// verify correctly replicated to slave
putAndReplicateRows(); putAndReplicateRows();
/** // Verify delete works
* Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows //
* from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on // step 1: stop hbase on Slave
* master, restart hbase on Slave step 4: verify Slave still have the rows before delete //
* t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master // step 2: at Master:
* step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows // t1_syncup: delete 50 rows from cf1
* from cf1 verify correctly replicated to Slave // t2_syncup: delete 100 rows from cf1
*/ // no change on 'norep'
//
// step 3: stop hbase on master, restart hbase on Slave
//
// step 4: verify Slave still have the rows before delete
// t1_syncup: 100 rows from cf1
// t2_syncup: 200 rows from cf1
//
// step 5: run syncup tool on Master
//
// step 6: verify that delete show up on Slave
// t1_syncup: 50 rows from cf1
// t2_syncup: 100 rows from cf1
//
// verify correctly replicated to Slave
mimicSyncUpAfterDelete(); mimicSyncUpAfterDelete();
/** // Verify put works
* Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from //
* cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will // step 1: stop hbase on Slave
* overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will //
* add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave // step 2: at Master:
* still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step // t1_syncup: put 100 rows from cf1
* 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not // t2_syncup: put 200 rows from cf1
* t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to // and put another row on 'norep'
* Slave // ATTN:
*/ // put to 'cf1' will overwrite existing rows, so end count will be 100 and 200 respectively
// put to 'norep' will add a new row.
//
// step 3: stop hbase on master, restart hbase on Slave
//
// step 4: verify Slave still has the rows before put
// t1_syncup: 50 rows from cf1
// t2_syncup: 100 rows from cf1
//
// step 5: run syncup tool on Master
//
// step 6: verify that put show up on Slave and 'norep' does not
// t1_syncup: 100 rows from cf1
// t2_syncup: 200 rows from cf1
//
// verify correctly replicated to Slave
mimicSyncUpAfterPut(); mimicSyncUpAfterPut();
} }
@ -172,7 +207,8 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
int rowCount_ht2Source = countRows(ht2Source); int rowCount_ht2Source = countRows(ht2Source);
assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101, assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
rowCount_ht2Source); rowCount_ht2Source);
List<ServerName> sourceRses = UTIL1.getHBaseCluster().getRegionServerThreads().stream()
.map(rst -> rst.getRegionServer().getServerName()).collect(Collectors.toList());
shutDownSourceHBaseCluster(); shutDownSourceHBaseCluster();
restartTargetHBaseCluster(1); restartTargetHBaseCluster(1);
@ -184,40 +220,33 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1); assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1); assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
syncUp(UTIL1);
// After sync up // After sync up
for (int i = 0; i < NB_RETRIES; i++) { rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
syncUp(UTIL1); rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); rowCountHt1TargetAtPeer1);
if (i == NB_RETRIES - 1) { assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) { rowCountHt2TargetAtPeer1);
// syncUP still failed. Let's look at the source in case anything wrong there
restartSourceHBaseCluster(1); // check we have recorded the dead region servers and also have an info file
rowCount_ht1Source = countRows(ht1Source); Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source); Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
rowCount_ht2Source = countRows(ht2Source); FileSystem fs = UTIL1.getTestFileSystem();
LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source); for (ServerName sn : sourceRses) {
} assertTrue(fs.exists(new Path(syncUpInfoDir, sn.getServerName())));
assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
rowCountHt2TargetAtPeer1);
}
if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
} }
assertTrue(fs.exists(new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE)));
assertEquals(sourceRses.size() + 1, fs.listStatus(syncUpInfoDir).length);
restartSourceHBaseCluster(1);
// should finally removed all the records after restart
UTIL1.waitFor(60000, () -> fs.listStatus(syncUpInfoDir).length == 0);
} }
private void mimicSyncUpAfterPut() throws Exception { private void mimicSyncUpAfterPut() throws Exception {
LOG.debug("mimicSyncUpAfterPut"); LOG.debug("mimicSyncUpAfterPut");
restartSourceHBaseCluster(1);
shutDownTargetHBaseCluster(); shutDownTargetHBaseCluster();
Put p; Put p;
@ -261,34 +290,14 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100, assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
rowCountHt2TargetAtPeer1); rowCountHt2TargetAtPeer1);
// after syun up syncUp(UTIL1);
for (int i = 0; i < NB_RETRIES; i++) {
syncUp(UTIL1); // after sync up
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) { assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) { rowCountHt1TargetAtPeer1);
// syncUP still failed. Let's look at the source in case anything wrong there assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
restartSourceHBaseCluster(1); rowCountHt2TargetAtPeer1);
rowCount_ht1Source = countRows(ht1Source);
LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = countRows(ht2Source);
LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
rowCountHt2TargetAtPeer1);
}
if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
LOG.info("SyncUpAfterPut succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
} }
} }

View File

@ -136,7 +136,8 @@ public abstract class TestReplicationSyncUpToolBase {
} }
final void syncUp(HBaseTestingUtil util) throws Exception { final void syncUp(HBaseTestingUtil util) throws Exception {
ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]); ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
new String[0]);
} }
// Utilities that manager shutdown / restart of source / sink clusters. They take care of // Utilities that manager shutdown / restart of source / sink clusters. They take care of

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -45,14 +46,11 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
//revisit later when we implement the new ReplicationSyncUpTool
@Ignore
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase { public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
@ -74,40 +72,50 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
@Test @Test
public void testSyncUpTool() throws Exception { public void testSyncUpTool() throws Exception {
/** // Set up Replication:
* Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily: // on Master and one Slave Table: t1_syncup and t2_syncup
* 'cf1' : replicated 'norep': not replicated // columnfamily:
*/ // 'cf1' : replicated
// 'norep': not replicated
setupReplication(); setupReplication();
/** // Prepare 24 random hfile ranges required for creating hfiles
* Prepare 24 random hfile ranges required for creating hfiles
*/
Iterator<String> randomHFileRangeListIterator = null; Iterator<String> randomHFileRangeListIterator = null;
Set<String> randomHFileRanges = new HashSet<>(24); Set<String> randomHFileRanges = new HashSet<>(24);
for (int i = 0; i < 24; i++) { for (int i = 0; i < 24; i++) {
randomHFileRanges.add(UTIL1.getRandomUUID().toString()); randomHFileRanges.add(HBaseTestingUtil.getRandomUUID().toString());
} }
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges); List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList); Collections.sort(randomHFileRangeList);
randomHFileRangeListIterator = randomHFileRangeList.iterator(); randomHFileRangeListIterator = randomHFileRangeList.iterator();
/** // at Master:
* at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows // t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows into norep
* into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 // t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3 rows into
* rows into norep verify correctly replicated to slave // norep
*/ // verify correctly replicated to slave
loadAndReplicateHFiles(true, randomHFileRangeListIterator); loadAndReplicateHFiles(true, randomHFileRangeListIterator);
/** // Verify hfile load works
* Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load //
* another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and // step 1: stop hbase on Slave
* 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave //
* still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step // step 2: at Master:
* 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not // t1_syncup: Load another 100 rows into cf1 and 3 rows into norep
* t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to // t2_syncup: Load another 200 rows into cf1 and 3 rows into norep
* Slave //
*/ // step 3: stop hbase on master, restart hbase on Slave
//
// step 4: verify Slave still has the rows before load
// t1_syncup: 100 rows from cf1
// t2_syncup: 200 rows from cf1
//
// step 5: run syncup tool on Master
//
// step 6: verify that hfiles show up on Slave and 'norep' does not
// t1_syncup: 200 rows from cf1
// t2_syncup: 400 rows from cf1
// verify correctly replicated to Slave
mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator); mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
} }
@ -142,34 +150,12 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
syncUp(UTIL1); syncUp(UTIL1);
// After syun up // After syun up
for (int i = 0; i < NB_RETRIES; i++) { rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
syncUp(UTIL1); rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1); assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1); rowCountHt1TargetAtPeer1);
if (i == NB_RETRIES - 1) { assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) { rowCountHt2TargetAtPeer1);
// syncUP still failed. Let's look at the source in case anything wrong there
restartSourceHBaseCluster(1);
rowCount_ht1Source = countRows(ht1Source);
LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = countRows(ht2Source);
LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
rowCountHt2TargetAtPeer1);
}
if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
} }
private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave, private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MD5Hash; import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
@ -420,4 +421,54 @@ public class TestTableReplicationQueueStorage {
assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size()); assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
assertTrue(storage.getReplicableHFiles(peerId2).isEmpty()); assertTrue(storage.getReplicableHFiles(peerId2).isEmpty());
} }
private void addLastSequenceIdsAndHFileRefs(String peerId1, String peerId2)
throws ReplicationException {
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
storage.setLastSequenceIds(peerId1, ImmutableMap.of(encodedRegionName, (long) i));
}
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
files1.add(new Pair<>(null, new Path("file_1")));
files1.add(new Pair<>(null, new Path("file_2")));
files1.add(new Pair<>(null, new Path("file_3")));
storage.addHFileRefs(peerId2, files1);
}
@Test
public void testRemoveLastSequenceIdsAndHFileRefsBefore()
throws ReplicationException, InterruptedException {
String peerId1 = "1";
String peerId2 = "2";
addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
// make sure we have write these out
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
}
assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, storage.getReplicableHFiles(peerId2).size());
// should have nothing after removal
long ts = EnvironmentEdgeManager.currentTime();
storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(HConstants.NO_SEQNUM, storage.getLastSequenceId(encodedRegionName, peerId1));
}
assertEquals(0, storage.getAllPeersFromHFileRefsQueue().size());
Thread.sleep(100);
// add again and remove with the old timestamp
addLastSequenceIdsAndHFileRefs(peerId1, peerId2);
storage.removeLastSequenceIdsAndHFileRefsBefore(ts);
// make sure we do not delete the data which are written after the give timestamp
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, storage.getLastSequenceId(encodedRegionName, peerId1));
}
assertEquals(1, storage.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, storage.getReplicableHFiles(peerId2).size());
}
} }

View File

@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
//revisit later when we implement the new ReplicationSyncUpTool
@Ignore
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool { public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicationSyncUpTool {

View File

@ -25,11 +25,8 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider; import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
//revisit later when we implement the new ReplicationSyncUpTool
@Ignore
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool { public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyncUpTool {

View File

@ -45,11 +45,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
@ -125,8 +122,6 @@ public class TestReplicationSourceManager {
private static final TableName TABLE_NAME = TableName.valueOf("test"); private static final TableName TABLE_NAME = TableName.valueOf("test");
private static TableDescriptor TD;
private static RegionInfo RI; private static RegionInfo RI;
private static NavigableMap<byte[], Integer> SCOPES; private static NavigableMap<byte[], Integer> SCOPES;
@ -152,10 +147,6 @@ public class TestReplicationSourceManager {
FS = UTIL.getTestFileSystem(); FS = UTIL.getTestFileSystem();
CONF = new Configuration(UTIL.getConfiguration()); CONF = new Configuration(UTIL.getConfiguration());
CONF.setLong("replication.sleep.before.failover", 0); CONF.setLong("replication.sleep.before.failover", 0);
TD = TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(F1)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(F2)).build();
RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build(); RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR); SCOPES = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ -176,7 +167,8 @@ public class TestReplicationSourceManager {
when(server.getConfiguration()).thenReturn(CONF); when(server.getConfiguration()).thenReturn(CONF);
when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher()); when(server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
when(server.getConnection()).thenReturn(UTIL.getConnection()); when(server.getConnection()).thenReturn(UTIL.getConnection());
when(server.getServerName()).thenReturn(ServerName.valueOf("hostname.example.org", 1234, 1)); ServerName sn = ServerName.valueOf("hostname.example.org", 1234, 1);
when(server.getServerName()).thenReturn(sn);
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
FS.mkdirs(oldLogDir); FS.mkdirs(oldLogDir);
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
@ -189,7 +181,7 @@ public class TestReplicationSourceManager {
CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); CONF.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
replication = new Replication(); replication = new Replication();
replication.initialize(server, FS, logDir, oldLogDir, replication.initialize(server, FS, new Path(logDir, sn.toString()), oldLogDir,
new WALFactory(CONF, server.getServerName(), null, false)); new WALFactory(CONF, server.getServerName(), null, false));
manager = replication.getReplicationManager(); manager = replication.getReplicationManager();
} }

View File

@ -99,8 +99,8 @@ public class TestSerialReplicationChecker {
TableName repTable = TableName.valueOf("test_serial_rep"); TableName repTable = TableName.valueOf("test_serial_rep");
UTIL.getAdmin() UTIL.getAdmin()
.createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable)); .createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(repTable));
QUEUE_STORAGE = QUEUE_STORAGE = ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(),
ReplicationStorageFactory.getReplicationQueueStorage(UTIL.getConnection(), repTable); UTIL.getConfiguration(), repTable);
} }
@AfterClass @AfterClass