HBASE-27214 Implement the new replication hfile/log cleaner (#4722)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2022-08-31 21:24:09 +08:00 committed by Duo Zhang
parent f81bdebedb
commit 6d0311c1d9
20 changed files with 1014 additions and 253 deletions

View File

@ -50,7 +50,7 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
} }
/** /**
* Used to do some cleanup work * Will be called after cleaner run.
*/ */
default void postClean() { default void postClean() {
} }

View File

@ -380,7 +380,7 @@ public final class MasterRegion {
params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize()); params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize());
walRoller.start(); walRoller.start();
WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false); WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false);
Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName());
Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG); Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG);
Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG); Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG);

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
@ -45,6 +44,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
private boolean enabled; private boolean enabled;
private boolean cleanerDisabled;
public AddPeerProcedure() { public AddPeerProcedure() {
} }
@ -84,15 +85,24 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override @Override
protected void releaseLatch(MasterProcedureEnv env) { protected void releaseLatch(MasterProcedureEnv env) {
if (cleanerDisabled) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
}
if (peerConfig.isSyncReplication()) { if (peerConfig.isSyncReplication()) {
env.getReplicationPeerManager().releaseSyncReplicationPeerLock(); env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
} }
ProcedurePrepareLatch.releaseLatch(latch, this); super.releaseLatch(env);
} }
@Override @Override
protected void prePeerModification(MasterProcedureEnv env) protected void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, ProcedureSuspendedException { throws IOException, ReplicationException, ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
peerId, backoff / 1000));
}
cleanerDisabled = true;
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig); cpHost.preAddReplicationPeer(peerId, peerConfig);
@ -128,9 +138,14 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override @Override
protected void afterReplay(MasterProcedureEnv env) { protected void afterReplay(MasterProcedureEnv env) {
if (getCurrentState() == getInitialState()) { if (getCurrentState() == getInitialState()) {
// will try to acquire the lock when executing the procedure, no need to acquire it here // do not need to disable log cleaner or acquire lock if we are in the initial state, later
// when executing the procedure we will try to disable and acquire.
return; return;
} }
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
cleanerDisabled = true;
if (peerConfig.isSyncReplication()) { if (peerConfig.isSyncReplication()) {
if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) { if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) {
throw new IllegalStateException( throw new IllegalStateException(

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@ -102,6 +103,9 @@ public class ReplicationPeerManager implements ConfigurationObserver {
// Only allow to add one sync replication peer concurrently // Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1); private final Semaphore syncReplicationPeerLock = new Semaphore(1);
private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
new ReplicationLogCleanerBarrier();
private final String clusterId; private final String clusterId;
private volatile Configuration conf; private volatile Configuration conf;
@ -705,6 +709,10 @@ public class ReplicationPeerManager implements ConfigurationObserver {
syncReplicationPeerLock.release(); syncReplicationPeerLock.release();
} }
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
return replicationLogCleanerBarrier;
}
@Override @Override
public void onConfigurationChange(Configuration conf) { public void onConfigurationChange(Configuration conf) {
this.conf = conf; this.conf = conf;

View File

@ -1733,7 +1733,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
* be hooked up to WAL. * be hooked up to WAL.
*/ */
private void setupWALAndReplication() throws IOException { private void setupWALAndReplication() throws IOException {
WALFactory factory = new WALFactory(conf, serverName.toString(), this, true); WALFactory factory = new WALFactory(conf, serverName, this, true);
// TODO Replication make assumptions here based on the default filesystem impl // TODO Replication make assumptions here based on the default filesystem impl
Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString());

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationOffsetUtil {
private ReplicationOffsetUtil() {
}
public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
// if no offset or the offset is just a place marker, replicate
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
return true;
}
// otherwise, compare the timestamp
long walTs = AbstractFSWALProvider.getTimestamp(wal);
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
if (walTs < startWalTs) {
return false;
} else if (walTs > startWalTs) {
return true;
}
// if the timestamp equals, usually it means we should include this wal but there is a special
// case, a negative offset means the wal has already been fully replicated, so here we should
// check the offset.
return offset.getOffset() >= 0;
}
}

View File

@ -17,18 +17,29 @@
*/ */
package org.apache.hadoop.hbase.replication.master; package org.apache.hadoop.hbase.replication.master;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.conf.Configuration; import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,35 +51,129 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
/** /**
* Implementation of a log cleaner that checks if a log is still scheduled for replication before * Implementation of a log cleaner that checks if a log is still scheduled for replication before
* deleting it when its TTL is over. * deleting it when its TTL is over.
* <p/>
* The logic is a bit complicated after we switch to use table based replication queue storage, see
* the design doc in HBASE-27109 and the comments in HBASE-27214 for more details.
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class ReplicationLogCleaner extends BaseLogCleanerDelegate { public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
private ZKWatcher zkw = null; private Set<ServerName> notFullyDeadServers;
private boolean shareZK = false; private Set<String> peerIds;
private ReplicationQueueStorage queueStorage; // ServerName -> PeerId -> WalGroup -> Offset
// Here the server name is the source server name, so we can make sure that there is only one
// queue for a given peer, that why we can use a String peerId as key instead of
// ReplicationQueueId.
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
private ReplicationPeerManager rpm;
private Supplier<Set<ServerName>> getNotFullyDeadServers;
private boolean canFilter;
private boolean stopped = false; private boolean stopped = false;
private Set<String> wals;
private long readZKTimestamp = 0;
@Override @Override
public void preClean() { public void preClean() {
readZKTimestamp = EnvironmentEdgeManager.currentTime(); if (this.getConf() == null) {
// TODO: revisit the implementation return;
// try { }
// // The concurrently created new WALs may not be included in the return list, canFilter = rpm.getReplicationLogCleanerBarrier().start();
// // but they won't be deleted because they're not in the checking set. if (canFilter) {
// wals = queueStorage.getAllWALs(); notFullyDeadServers = getNotFullyDeadServers.get();
// } catch (ReplicationException e) { peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
// LOG.warn("Failed to read zookeeper, skipping checking deletable files"); .collect(Collectors.toSet());
// wals = null; // must get the not fully dead servers first and then get the replication queue data, in this
// } // way we can make sure that, we should have added the missing replication queues for the dead
// region servers recorded in the above set, otherwise the logic in the
// filterForDeadRegionServer method may lead us delete wal still in use.
List<ReplicationQueueData> allQueueData;
try {
allQueueData = rpm.getQueueStorage().listAllQueues();
} catch (ReplicationException e) {
LOG.error("Can not list all replication queues, give up cleaning", e);
rpm.getReplicationLogCleanerBarrier().stop();
canFilter = false;
notFullyDeadServers = null;
peerIds = null;
return;
}
replicationOffsets = new HashMap<>();
for (ReplicationQueueData queueData : allQueueData) {
ReplicationQueueId queueId = queueData.getId();
ServerName serverName = queueId.getServerWALsBelongTo();
Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>());
Map<String, ReplicationGroupOffset> offsets =
peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>());
offsets.putAll(queueData.getOffsets());
}
} else {
LOG.info("Skip replication log cleaner because an AddPeerProcedure is running");
}
} }
@Override @Override
public void postClean() { public void postClean() {
// release memory if (canFilter) {
wals = null; rpm.getReplicationLogCleanerBarrier().stop();
canFilter = false;
// release memory
notFullyDeadServers = null;
peerIds = null;
replicationOffsets = null;
}
}
private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) {
return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName());
}
private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) {
Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
replicationOffsets.get(serverName);
if (peerId2Offsets == null) {
// if there are replication queues missing, we can not delete the wal
return false;
}
for (String peerId : peerIds) {
Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
// if no replication queue for a peer, we can not delete the wal
if (offsets == null) {
return false;
}
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
ReplicationGroupOffset offset = offsets.get(walGroupId);
// if a replication queue still need to replicate this wal, we can not delete it
if (!shouldDelete(offset, file)) {
return false;
}
}
// if all replication queues have already finished replicating this wal, we can delete it.
return true;
}
private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) {
Map<String, Map<String, ReplicationGroupOffset>> peerId2Offsets =
replicationOffsets.get(serverName);
if (peerId2Offsets == null) {
// no replication queue for this dead rs, we can delete all wal files for it
return true;
}
for (String peerId : peerIds) {
Map<String, ReplicationGroupOffset> offsets = peerId2Offsets.get(peerId);
if (offsets == null) {
// for dead server, we only care about existing replication queues, as we will delete a
// queue after we finish replicating it.
continue;
}
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName());
ReplicationGroupOffset offset = offsets.get(walGroupId);
// if a replication queue still need to replicate this wal, we can not delete it
if (!shouldDelete(offset, file)) {
return false;
}
}
// if all replication queues have already finished replicating this wal, we can delete it.
return true;
} }
@Override @Override
@ -78,10 +183,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
if (this.getConf() == null) { if (this.getConf() == null) {
return files; return files;
} }
if (!canFilter) {
if (wals == null) { // We can not delete anything if there are AddPeerProcedure running at the same time
// See HBASE-27214 for more details.
return Collections.emptyList(); return Collections.emptyList();
} }
return Iterables.filter(files, new Predicate<FileStatus>() { return Iterables.filter(files, new Predicate<FileStatus>() {
@Override @Override
public boolean apply(FileStatus file) { public boolean apply(FileStatus file) {
@ -90,65 +197,56 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
if (file == null) { if (file == null) {
return false; return false;
} }
String wal = file.getPath().getName(); if (peerIds.isEmpty()) {
boolean logInReplicationQueue = wals.contains(wal); // no peer, can always delete
if (logInReplicationQueue) { return true;
LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); }
// not a valid wal file name, delete
if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) {
return true;
}
// meta wal is always deletable as we will never replicate it
if (AbstractFSWALProvider.isMetaFile(file.getPath())) {
return true;
}
ServerName serverName =
AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName());
if (notFullyDeadServers.contains(serverName)) {
return filterForLiveRegionServer(serverName, file);
} else {
return filterForDeadRegionServer(serverName, file);
} }
return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
} }
}); });
} }
private Set<ServerName> getNotFullyDeadServers(MasterServices services) {
List<ServerName> onlineServers = services.getServerManager().getOnlineServersList();
return Stream.concat(onlineServers.stream(),
services.getMasterProcedureExecutor().getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished())
.map(p -> ((ServerCrashProcedure) p).getServerName()))
.collect(Collectors.toSet());
}
@Override @Override
public void init(Map<String, Object> params) { public void init(Map<String, Object> params) {
super.init(params); super.init(params);
try { if (MapUtils.isNotEmpty(params)) {
if (MapUtils.isNotEmpty(params)) { Object master = params.get(HMaster.MASTER);
Object master = params.get(HMaster.MASTER); if (master != null && master instanceof MasterServices) {
if (master != null && master instanceof HMaster) { MasterServices m = (MasterServices) master;
zkw = ((HMaster) master).getZooKeeper(); rpm = m.getReplicationPeerManager();
shareZK = true; getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
} return;
} }
if (zkw == null) {
zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null);
}
// TODO: revisit the implementation
// this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} }
} throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter");
@InterfaceAudience.Private
public void setConf(Configuration conf, ZKWatcher zk) {
super.setConf(conf);
try {
this.zkw = zk;
// TODO: revisit the implementation
// this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf);
} catch (Exception e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}
@InterfaceAudience.Private
public void setConf(Configuration conf, ZKWatcher zk,
ReplicationQueueStorage replicationQueueStorage) {
super.setConf(conf);
this.zkw = zk;
this.queueStorage = replicationQueueStorage;
} }
@Override @Override
public void stop(String why) { public void stop(String why) {
if (this.stopped) return;
this.stopped = true; this.stopped = true;
if (!shareZK && this.zkw != null) {
LOG.info("Stopping " + this.zkw);
this.zkw.close();
}
} }
@Override @Override

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import org.apache.yetus.audience.InterfaceAudience;
/**
* A barrier to guard the execution of {@link ReplicationLogCleaner}.
* <p/>
* The reason why we introduce this class is because there could be race between
* {@link org.apache.hadoop.hbase.master.replication.AddPeerProcedure} and
* {@link ReplicationLogCleaner}. See HBASE-27214 for more details.
*/
@InterfaceAudience.Private
public class ReplicationLogCleanerBarrier {
private enum State {
// the cleaner is not running
NOT_RUNNING,
// the cleaner is running
RUNNING,
// the cleaner is disabled
DISABLED
}
private State state = State.NOT_RUNNING;
// we could have multiple AddPeerProcedure running at the same time, so here we need to do
// reference counting.
private int numberDisabled = 0;
public synchronized boolean start() {
if (state == State.NOT_RUNNING) {
state = State.RUNNING;
return true;
}
if (state == State.DISABLED) {
return false;
}
throw new IllegalStateException("Unexpected state " + state);
}
public synchronized void stop() {
if (state != State.RUNNING) {
throw new IllegalStateException("Unexpected state " + state);
}
state = State.NOT_RUNNING;
}
public synchronized boolean disable() {
if (state == State.RUNNING) {
return false;
}
if (state == State.NOT_RUNNING) {
state = State.DISABLED;
}
numberDisabled++;
return true;
}
public synchronized void enable() {
if (state != State.DISABLED) {
throw new IllegalStateException("Unexpected state " + state);
}
numberDisabled--;
if (numberDisabled == 0) {
state = State.NOT_RUNNING;
}
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
@ -809,22 +810,7 @@ public class ReplicationSourceManager {
if (AbstractFSWALProvider.isMetaFile(wal)) { if (AbstractFSWALProvider.isMetaFile(wal)) {
return false; return false;
} }
// if no offset or the offset is just a place marker, replicate return ReplicationOffsetUtil.shouldReplicate(offset, wal);
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
return true;
}
// otherwise, compare the timestamp
long walTs = AbstractFSWALProvider.getTimestamp(wal);
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
if (walTs < startWalTs) {
return false;
} else if (walTs > startWalTs) {
return true;
}
// if the timestamp equals, usually it means we should include this wal but there is a special
// case, a negative offset means the wal has already been fully replicated, so here we should
// check the offset.
return offset.getOffset() >= 0;
} }
void claimQueue(ReplicationQueueId queueId) { void claimQueue(ReplicationQueueId queueId) {

View File

@ -117,7 +117,10 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.out.println("Start Replication Server start"); System.out.println("Start Replication Server start");
Replication replication = new Replication(); Replication replication = new Replication();
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
new WALFactory(conf, "test", null, false)); new WALFactory(conf,
ServerName
.valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
null, false));
ReplicationSourceManager manager = replication.getReplicationManager(); ReplicationSourceManager manager = replication.getReplicationManager();
manager.init(); manager.init();
claimReplicationQueues(zkw, manager); claimReplicationQueues(zkw, manager);

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.wal;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
@ -582,4 +586,29 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
public static String getWALPrefixFromWALName(String name) { public static String getWALPrefixFromWALName(String name) {
return getWALNameGroupFromWALName(name, 1); return getWALNameGroupFromWALName(name, 1);
} }
private static final Pattern SERVER_NAME_PATTERN = Pattern.compile("^[^"
+ ServerName.SERVERNAME_SEPARATOR + "]+" + ServerName.SERVERNAME_SEPARATOR
+ Addressing.VALID_PORT_REGEX + ServerName.SERVERNAME_SEPARATOR + Addressing.VALID_PORT_REGEX);
/**
* Parse the server name from wal prefix. A wal's name is always started with a server name in non
* test code.
* @throws IllegalArgumentException if the name passed in is not started with a server name
* @return the server name
*/
public static ServerName parseServerNameFromWALName(String name) {
String decoded;
try {
decoded = URLDecoder.decode(name, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new AssertionError("should never happen", e);
}
Matcher matcher = SERVER_NAME_PATTERN.matcher(decoded);
if (matcher.find()) {
return ServerName.valueOf(matcher.group());
} else {
throw new IllegalArgumentException(name + " is not started with a server name");
}
}
} }

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
@ -191,17 +192,35 @@ public class WALFactory {
} }
/** /**
* @param conf must not be null, will keep a reference to read params in later reader/writer * Create a WALFactory.
* instances.
* @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations
* to make a directory
*/ */
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java")
public WALFactory(Configuration conf, String factoryId) throws IOException { public WALFactory(Configuration conf, String factoryId) throws IOException {
// default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider
// for HMaster or HRegionServer which take system table only. See HBASE-19999 // for HMaster or HRegionServer which take system table only. See HBASE-19999
this(conf, factoryId, null, true); this(conf, factoryId, null, true);
} }
/**
* Create a WALFactory.
* <p/>
* This is the constructor you should use when creating a WALFactory in normal code, to make sure
* that the {@code factoryId} is the server name. We need this assumption in some places for
* parsing the server name out from the wal file name.
* @param conf must not be null, will keep a reference to read params
* in later reader/writer instances.
* @param serverName use to generate the factoryId, which will be append at
* the first of the final file name
* @param abortable the server associated with this WAL file
* @param enableSyncReplicationWALProvider whether wrap the wal provider to a
* {@link SyncReplicationWALProvider} n
*/
public WALFactory(Configuration conf, ServerName serverName, Abortable abortable,
boolean enableSyncReplicationWALProvider) throws IOException {
this(conf, serverName.toString(), abortable, enableSyncReplicationWALProvider);
}
/** /**
* @param conf must not be null, will keep a reference to read params * @param conf must not be null, will keep a reference to read params
* in later reader/writer instances. * in later reader/writer instances.
@ -211,7 +230,7 @@ public class WALFactory {
* @param enableSyncReplicationWALProvider whether wrap the wal provider to a * @param enableSyncReplicationWALProvider whether wrap the wal provider to a
* {@link SyncReplicationWALProvider} * {@link SyncReplicationWALProvider}
*/ */
public WALFactory(Configuration conf, String factoryId, Abortable abortable, private WALFactory(Configuration conf, String factoryId, Abortable abortable,
boolean enableSyncReplicationWALProvider) throws IOException { boolean enableSyncReplicationWALProvider) throws IOException {
// until we've moved reader/writer construction down into providers, this initialization must // until we've moved reader/writer construction down into providers, this initialization must
// happen prior to provider initialization, in case they need to instantiate a reader/writer. // happen prior to provider initialization, in case they need to instantiate a reader/writer.

View File

@ -18,57 +18,60 @@
package org.apache.hadoop.hbase.master.cleaner; package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.mock;
import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
// revisit later after we implement new replication log cleaner import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Ignore
@Category({ MasterTests.class, MediumTests.class }) @Category({ MasterTests.class, MediumTests.class })
public class TestLogsCleaner { public class TestLogsCleaner {
@ -88,22 +91,29 @@ public class TestLogsCleaner {
private static DirScanPool POOL; private static DirScanPool POOL;
private static String peerId = "1";
private MasterServices masterServices;
private ReplicationQueueStorage queueStorage;
@Rule
public final TableNameTestRule tableNameRule = new TableNameTestRule();
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniCluster();
TEST_UTIL.startMiniDFSCluster(1);
POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration());
} }
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniCluster();
TEST_UTIL.shutdownMiniDFSCluster();
POOL.shutdownNow(); POOL.shutdownNow();
} }
@Before @Before
public void beforeTest() throws IOException { public void beforeTest() throws Exception {
conf = TEST_UTIL.getConfiguration(); conf = TEST_UTIL.getConfiguration();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
@ -112,14 +122,51 @@ public class TestLogsCleaner {
// root directory // root directory
fs.mkdirs(OLD_WALS_DIR); fs.mkdirs(OLD_WALS_DIR);
TableName tableName = tableNameRule.getTableName();
TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
TEST_UTIL.getAdmin().createTable(td);
TEST_UTIL.waitTableAvailable(tableName);
queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName);
masterServices = mock(MasterServices.class);
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.getQueueStorage()).thenReturn(queueStorage);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ServerManager sm = mock(ServerManager.class);
when(masterServices.getServerManager()).thenReturn(sm);
when(sm.getOnlineServersList()).thenReturn(Collections.emptyList());
@SuppressWarnings("unchecked")
ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec);
when(procExec.getProcedures()).thenReturn(Collections.emptyList());
} }
/** /**
* This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same
* oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from * oldWALs directory.
* which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp * <p/>
* in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2 * Created files:
* new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory * <ul>
* <li>2 invalid files</li>
* <li>5 old Procedure WALs</li>
* <li>30 old WALs from which 3 are in replication</li>
* <li>5 recent Procedure WALs</li>
* <li>1 recent WAL</li>
* <li>1 very new WAL (timestamp in future)</li>
* <li>masterProcedureWALs subdirectory</li>
* </ul>
* Files which should stay:
* <ul>
* <li>3 replication WALs</li>
* <li>2 new WALs</li>
* <li>5 latest Procedure WALs</li>
* <li>masterProcedureWALs subdirectory</li>
* </ul>
*/ */
@Test @Test
public void testLogCleaning() throws Exception { public void testLogCleaning() throws Exception {
@ -131,9 +178,6 @@ public class TestLogsCleaner {
HMaster.decorateMasterConfiguration(conf); HMaster.decorateMasterConfiguration(conf);
Server server = new DummyServer(); Server server = new DummyServer();
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf);
String fakeMachineName = String fakeMachineName =
URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name());
@ -159,14 +203,12 @@ public class TestLogsCleaner {
for (int i = 1; i <= 30; i++) { for (int i = 1; i <= 30; i++) {
Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i));
fs.createNewFile(fileName); fs.createNewFile(fileName);
// Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
// files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
if (i % (30 / 3) == 0) {
// queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
LOG.info("Replication log file: " + fileName);
}
} }
// Case 4: the newest 3 WALs will be kept because they are beyond the replication offset
masterServices.getReplicationPeerManager().listPeers(null)
.add(new ReplicationPeerDescription(peerId, true, null, null));
queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName,
new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap());
// Case 5: 5 Procedure WALs that are new, will stay // Case 5: 5 Procedure WALs that are new, will stay
for (int i = 6; i <= 10; i++) { for (int i = 6; i <= 10; i++) {
Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i));
@ -189,7 +231,8 @@ public class TestLogsCleaner {
// 10 procedure WALs // 10 procedure WALs
assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length);
LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL,
ImmutableMap.of(HMaster.MASTER, masterServices));
cleaner.chore(); cleaner.chore();
// In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which
@ -208,98 +251,14 @@ public class TestLogsCleaner {
} }
} }
@Test
public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception {
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> dummyFiles = Arrays.asList(
new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")),
new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2")));
FaultyZooKeeperWatcher faultyZK =
new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null);
final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false);
try {
faultyZK.init(false);
ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory
.getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf));
// doAnswer(new Answer<Object>() {
// @Override
// public Object answer(InvocationOnMock invocation) throws Throwable {
// try {
// return invocation.callRealMethod();
// } catch (ReplicationException e) {
// LOG.debug("Caught Exception", e);
// getListOfReplicatorsFailed.set(true);
// throw e;
// }
// }
// }).when(queueStorage).getAllWALs();
cleaner.setConf(conf, faultyZK, queueStorage);
// should keep all files due to a ConnectionLossException getting the queues znodes
cleaner.preClean();
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertTrue(getListOfReplicatorsFailed.get());
assertFalse(toDelete.iterator().hasNext());
assertFalse(cleaner.isStopped());
// zk recovery.
faultyZK.init(true);
cleaner.preClean();
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator();
assertTrue(iter.hasNext());
assertEquals(new Path("log1"), iter.next().getPath());
assertTrue(iter.hasNext());
assertEquals(new Path("log2"), iter.next().getPath());
assertFalse(iter.hasNext());
} finally {
faultyZK.close();
}
}
/**
* When zk is working both files should be returned
* @throws Exception from ZK watcher
*/
@Test
public void testZooKeeperNormal() throws Exception {
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
// Subtract 1000 from current time so modtime is for sure older
// than 'now'.
long modTime = EnvironmentEdgeManager.currentTime() - 1000;
List<FileStatus> dummyFiles =
Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")),
new FileStatus(100, false, 3, 100, modTime, new Path("log2")));
ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null);
try {
cleaner.setConf(conf, zkw);
cleaner.preClean();
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator();
assertTrue(iter.hasNext());
assertEquals(new Path("log1"), iter.next().getPath());
assertTrue(iter.hasNext());
assertEquals(new Path("log2"), iter.next().getPath());
assertFalse(iter.hasNext());
} finally {
zkw.close();
}
}
@Test @Test
public void testOnConfigurationChange() throws Exception { public void testOnConfigurationChange() throws Exception {
// Prepare environments // Prepare environments
Server server = new DummyServer(); Server server = new DummyServer();
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL,
ImmutableMap.of(HMaster.MASTER, masterServices));
int size = cleaner.getSizeOfCleaners(); int size = cleaner.getSizeOfCleaners();
assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC,
cleaner.getCleanerThreadTimeoutMsec()); cleaner.getCleanerThreadTimeoutMsec());
@ -338,7 +297,7 @@ public class TestLogsCleaner {
} }
} }
static class DummyServer extends MockServer { private static final class DummyServer extends MockServer {
@Override @Override
public Configuration getConfiguration() { public Configuration getConfiguration() {
@ -355,26 +314,4 @@ public class TestLogsCleaner {
return null; return null;
} }
} }
static class FaultyZooKeeperWatcher extends ZKWatcher {
private RecoverableZooKeeper zk;
public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable)
throws ZooKeeperConnectionException, IOException {
super(conf, identifier, abortable);
}
public void init(boolean autoRecovery) throws Exception {
this.zk = spy(super.getRecoverableZooKeeper());
if (!autoRecovery) {
doThrow(new KeeperException.ConnectionLossException()).when(zk)
.getChildren("/hbase/replication/rs", null);
}
}
@Override
public RecoverableZooKeeper getRecoverableZooKeeper() {
return zk;
}
}
} }

View File

@ -26,6 +26,7 @@ import java.io.UncheckedIOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -34,7 +35,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
@ -48,19 +51,19 @@ import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
// TODO: revisit later import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Ignore
@Category({ MasterTests.class, SmallTests.class }) @Category({ MasterTests.class, SmallTests.class })
public class TestReplicationHFileCleaner { public class TestReplicationHFileCleaner {
@ -71,19 +74,25 @@ public class TestReplicationHFileCleaner {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static Server server; private static Server server;
private static final TableName tableName = TableName.valueOf("test_cleaner");
private static ReplicationQueueStorage rq; private static ReplicationQueueStorage rq;
private static ReplicationPeers rp; private static ReplicationPeers rp;
private static final String peerId = "TestReplicationHFileCleaner"; private static final String peerId = "TestReplicationHFileCleaner";
private static Configuration conf = TEST_UTIL.getConfiguration(); private static Configuration conf = TEST_UTIL.getConfiguration();
static FileSystem fs = null; private static FileSystem fs = null;
Path root; private static Map<String, Object> params;
private Path root;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
server = new DummyServer(); server = new DummyServer();
params = ImmutableMap.of(HMaster.MASTER, server);
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
HMaster.decorateMasterConfiguration(conf); HMaster.decorateMasterConfiguration(conf);
TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName);
TEST_UTIL.getAdmin().createTable(td);
conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString());
rp = rp =
ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf); ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf);
rp.init(); rp.init();
@ -93,7 +102,7 @@ public class TestReplicationHFileCleaner {
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@Before @Before
@ -116,6 +125,13 @@ public class TestReplicationHFileCleaner {
rp.getPeerStorage().removePeer(peerId); rp.getPeerStorage().removePeer(peerId);
} }
private ReplicationHFileCleaner createCleaner() {
ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
cleaner.setConf(conf);
cleaner.init(params);
return cleaner;
}
@Test @Test
public void testIsFileDeletable() throws IOException, ReplicationException { public void testIsFileDeletable() throws IOException, ReplicationException {
// 1. Create a file // 1. Create a file
@ -123,8 +139,7 @@ public class TestReplicationHFileCleaner {
fs.createNewFile(file); fs.createNewFile(file);
// 2. Assert file is successfully created // 2. Assert file is successfully created
assertTrue("Test file not created!", fs.exists(file)); assertTrue("Test file not created!", fs.exists(file));
ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); ReplicationHFileCleaner cleaner = createCleaner();
cleaner.setConf(conf);
// 3. Assert that file as is should be deletable // 3. Assert that file as is should be deletable
assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
+ "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file)));
@ -161,8 +176,7 @@ public class TestReplicationHFileCleaner {
// 2. Add one file to hfile-refs queue // 2. Add one file to hfile-refs queue
rq.addHFileRefs(peerId, hfiles); rq.addHFileRefs(peerId, hfiles);
ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); ReplicationHFileCleaner cleaner = createCleaner();
cleaner.setConf(conf);
Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
int i = 0; int i = 0;
while (deletableFilesIterator.hasNext() && i < 2) { while (deletableFilesIterator.hasNext() && i < 2) {
@ -183,6 +197,15 @@ public class TestReplicationHFileCleaner {
return TEST_UTIL.getConfiguration(); return TEST_UTIL.getConfiguration();
} }
@Override
public ZKWatcher getZooKeeper() {
try {
return TEST_UTIL.getZooKeeperWatcher();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override @Override
public Connection getConnection() { public Connection getConnection() {
try { try {

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.replication.ReplicationOffsetUtil.shouldReplicate;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, SmallTests.class })
public class TestReplicationOffsetUtil {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationOffsetUtil.class);
@Test
public void test() {
assertTrue(shouldReplicate(null, "whatever"));
assertTrue(shouldReplicate(ReplicationGroupOffset.BEGIN, "whatever"));
ServerName sn = ServerName.valueOf("host", 16010, EnvironmentEdgeManager.currentTime());
ReplicationGroupOffset offset = new ReplicationGroupOffset(sn + ".12345", 100);
assertTrue(shouldReplicate(offset, sn + ".12346"));
assertFalse(shouldReplicate(offset, sn + ".12344"));
assertTrue(shouldReplicate(offset, sn + ".12345"));
// -1 means finish replication, so should not replicate
assertFalse(shouldReplicate(new ReplicationGroupOffset(sn + ".12345", -1), sn + ".12345"));
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ MasterTests.class, SmallTests.class })
public class TestLogCleanerBarrier {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestLogCleanerBarrier.class);
@Test
public void test() {
ReplicationLogCleanerBarrier barrier = new ReplicationLogCleanerBarrier();
assertThrows(IllegalStateException.class, () -> barrier.stop());
assertThrows(IllegalStateException.class, () -> barrier.enable());
assertTrue(barrier.start());
assertThrows(IllegalStateException.class, () -> barrier.start());
assertThrows(IllegalStateException.class, () -> barrier.enable());
assertFalse(barrier.disable());
assertThrows(IllegalStateException.class, () -> barrier.enable());
barrier.stop();
for (int i = 0; i < 3; i++) {
assertTrue(barrier.disable());
assertFalse(barrier.start());
}
for (int i = 0; i < 3; i++) {
assertFalse(barrier.start());
barrier.enable();
}
assertTrue(barrier.start());
}
}

View File

@ -0,0 +1,385 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.emptyIterable;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ MasterTests.class, SmallTests.class })
public class TestReplicationLogCleaner {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationLogCleaner.class);
private static final Configuration CONF = HBaseConfiguration.create();
private MasterServices services;
private ReplicationLogCleaner cleaner;
@Before
public void setUp() throws ReplicationException {
services = mock(MasterServices.class);
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
when(services.getReplicationPeerManager()).thenReturn(rpm);
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
when(rpm.getQueueStorage()).thenReturn(rqs);
when(rqs.listAllQueues()).thenReturn(new ArrayList<>());
ServerManager sm = mock(ServerManager.class);
when(services.getServerManager()).thenReturn(sm);
when(sm.getOnlineServersList()).thenReturn(new ArrayList<>());
@SuppressWarnings("unchecked")
ProcedureExecutor<MasterProcedureEnv> procExec = mock(ProcedureExecutor.class);
when(services.getMasterProcedureExecutor()).thenReturn(procExec);
when(procExec.getProcedures()).thenReturn(new ArrayList<>());
cleaner = new ReplicationLogCleaner();
cleaner.setConf(CONF);
Map<String, Object> params = ImmutableMap.of(HMaster.MASTER, services);
cleaner.init(params);
}
@After
public void tearDown() {
cleaner.postClean();
}
private static Iterable<FileStatus> runCleaner(ReplicationLogCleaner cleaner,
Iterable<FileStatus> files) {
cleaner.preClean();
return cleaner.getDeletableFiles(files);
}
private static FileStatus createFileStatus(Path path) {
return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path);
}
private static FileStatus createFileStatus(ServerName sn, int number) {
Path path = new Path(sn.toString() + "." + number);
return createFileStatus(path);
}
private static ReplicationPeerDescription createPeer(String peerId) {
return new ReplicationPeerDescription(peerId, true, null, null);
}
private void addServer(ServerName serverName) {
services.getServerManager().getOnlineServersList().add(serverName);
}
private void addSCP(ServerName serverName, boolean finished) {
ServerCrashProcedure scp = mock(ServerCrashProcedure.class);
when(scp.getServerName()).thenReturn(serverName);
when(scp.isFinished()).thenReturn(finished);
services.getMasterProcedureExecutor().getProcedures().add(scp);
}
private void addPeer(String... peerIds) {
services.getReplicationPeerManager().listPeers(null).addAll(
Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList()));
}
private void addQueueData(ReplicationQueueData... datas) throws ReplicationException {
services.getReplicationPeerManager().getQueueStorage().listAllQueues()
.addAll(Arrays.asList(datas));
}
@Test
public void testNoConf() {
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
List<FileStatus> files = Arrays.asList(new FileStatus());
assertSame(files, runCleaner(cleaner, files));
cleaner.postClean();
}
@Test
public void testCanNotFilter() {
assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable());
List<FileStatus> files = Arrays.asList(new FileStatus());
assertSame(Collections.emptyList(), runCleaner(cleaner, files));
}
@Test
public void testNoPeer() {
Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime());
assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
FileStatus file = createFileStatus(path);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testNotValidWalFile() {
addPeer("1");
Path path = new Path("/whatever");
assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName()));
FileStatus file = createFileStatus(path);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testMetaWalFile() {
addPeer("1");
Path path = new Path(
"/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID);
assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName()));
assertTrue(AbstractFSWALProvider.isMetaFile(path));
FileStatus file = createFileStatus(path);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testLiveRegionServerNoQueues() {
addPeer("1");
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addServer(sn);
List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
assertThat(runCleaner(cleaner, files), emptyIterable());
}
@Test
public void testLiveRegionServerWithSCPNoQueues() {
addPeer("1");
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addSCP(sn, false);
List<FileStatus> files = Arrays.asList(createFileStatus(sn, 1));
assertThat(runCleaner(cleaner, files), emptyIterable());
}
@Test
public void testDeadRegionServerNoQueues() {
addPeer("1");
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
FileStatus file = createFileStatus(sn, 1);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testDeadRegionServerWithSCPNoQueues() {
addPeer("1");
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addSCP(sn, true);
FileStatus file = createFileStatus(sn, 1);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testLiveRegionServerMissingQueue() throws ReplicationException {
String peerId1 = "1";
String peerId2 = "2";
addPeer(peerId1, peerId2);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addServer(sn);
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
addQueueData(data1);
assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
}
@Test
public void testLiveRegionServerShouldNotDelete() throws ReplicationException {
String peerId = "1";
addPeer(peerId);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addServer(sn);
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
addQueueData(data);
assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
}
@Test
public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
String peerId1 = "1";
String peerId2 = "2";
addPeer(peerId1, peerId2);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addServer(sn);
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
addQueueData(data1, data2);
assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
}
@Test
public void testLiveRegionServerShouldDelete() throws ReplicationException {
String peerId = "1";
addPeer(peerId);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addServer(sn);
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException {
String peerId1 = "1";
String peerId2 = "2";
addPeer(peerId1, peerId2);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
addServer(sn);
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
addQueueData(data1, data2);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testDeadRegionServerMissingQueue() throws ReplicationException {
String peerId1 = "1";
String peerId2 = "2";
addPeer(peerId1, peerId2);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
addQueueData(data1);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testDeadRegionServerShouldNotDelete() throws ReplicationException {
String peerId = "1";
addPeer(peerId);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
addQueueData(data);
assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
}
@Test
public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException {
String peerId1 = "1";
String peerId2 = "2";
addPeer(peerId1, peerId2);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0)));
addQueueData(data1, data2);
assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable());
}
@Test
public void testDeadRegionServerShouldDelete() throws ReplicationException {
String peerId = "1";
addPeer(peerId);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
@Test
public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException {
String peerId1 = "1";
String peerId2 = "2";
addPeer(peerId1, peerId2);
ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime());
FileStatus file = createFileStatus(sn, 1);
ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2),
ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1)));
addQueueData(data1, data2);
Iterator<FileStatus> iter = runCleaner(cleaner, Arrays.asList(file)).iterator();
assertSame(file, iter.next());
assertFalse(iter.hasNext());
}
}

View File

@ -190,7 +190,7 @@ public class TestReplicationSourceManager {
replication = new Replication(); replication = new Replication();
replication.initialize(server, FS, logDir, oldLogDir, replication.initialize(server, FS, logDir, oldLogDir,
new WALFactory(CONF, "test", null, false)); new WALFactory(CONF, server.getServerName(), null, false));
manager = replication.getReplicationManager(); manager = replication.getReplicationManager();
} }

View File

@ -630,7 +630,7 @@ public class TestWALFactory {
assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass()); assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass());
// if providers are not set and do not enable SyncReplicationWALProvider // if providers are not set and do not enable SyncReplicationWALProvider
walFactory = new WALFactory(conf, this.currentServername.toString(), null, false); walFactory = new WALFactory(conf, this.currentServername, null, false);
assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass());
} }

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
@ -183,4 +184,17 @@ public class TestWALMethods {
return entry; return entry;
} }
@Test
public void testParseServerNameFromWALName() {
assertEquals(ServerName.valueOf("abc,123,123"),
AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.1.12345.meta"));
assertEquals(ServerName.valueOf("abc,123,123"),
AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.12345"));
assertEquals(ServerName.valueOf("abc,123,123"),
AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123"));
assertThrows(IllegalArgumentException.class,
() -> AbstractFSWALProvider.parseServerNameFromWALName("test,abc,123,123.12345"));
assertThrows(IllegalArgumentException.class,
() -> AbstractFSWALProvider.parseServerNameFromWALName("abc"));
}
} }