diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java index d37bb620273..e08f5329433 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java @@ -50,7 +50,7 @@ public interface FileCleanerDelegate extends Configurable, Stoppable { } /** - * Used to do some cleanup work + * Will be called after cleaner run. */ default void postClean() { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java index 86c23114458..e45b6271f7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java @@ -380,7 +380,7 @@ public final class MasterRegion { params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize()); walRoller.start(); - WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false); + WALFactory walFactory = new WALFactory(conf, server.getServerName(), server, false); Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); Path initializingFlag = new Path(tableDir, INITIALIZING_FLAG); Path initializedFlag = new Path(tableDir, INITIALIZED_FLAG); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java index 6d0acee76ca..1d02fab5f19 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -45,6 +44,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure { private boolean enabled; + private boolean cleanerDisabled; + public AddPeerProcedure() { } @@ -84,15 +85,24 @@ public class AddPeerProcedure extends ModifyPeerProcedure { @Override protected void releaseLatch(MasterProcedureEnv env) { + if (cleanerDisabled) { + env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable(); + } if (peerConfig.isSyncReplication()) { env.getReplicationPeerManager().releaseSyncReplicationPeerLock(); } - ProcedurePrepareLatch.releaseLatch(latch, this); + super.releaseLatch(env); } @Override protected void prePeerModification(MasterProcedureEnv env) throws IOException, ReplicationException, ProcedureSuspendedException { + if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + throw suspend(env.getMasterConfiguration(), + backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs", + peerId, backoff / 1000)); + } + cleanerDisabled = true; MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); if (cpHost != null) { cpHost.preAddReplicationPeer(peerId, peerConfig); @@ -128,9 +138,14 @@ public class AddPeerProcedure extends ModifyPeerProcedure { @Override protected void afterReplay(MasterProcedureEnv env) { if (getCurrentState() == getInitialState()) { - // will try to acquire the lock when executing the procedure, no need to acquire it here + // do not need to disable log cleaner or acquire lock if we are in the initial state, later + // when executing the procedure we will try to disable and acquire. return; } + if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) { + throw new IllegalStateException("can not disable log cleaner, this should not happen"); + } + cleanerDisabled = true; if (peerConfig.isSyncReplication()) { if (!env.getReplicationPeerManager().tryAcquireSyncReplicationPeerLock()) { throw new IllegalStateException( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 53270bcbb04..57380920d0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKConfig; @@ -102,6 +103,9 @@ public class ReplicationPeerManager implements ConfigurationObserver { // Only allow to add one sync replication peer concurrently private final Semaphore syncReplicationPeerLock = new Semaphore(1); + private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier = + new ReplicationLogCleanerBarrier(); + private final String clusterId; private volatile Configuration conf; @@ -705,6 +709,10 @@ public class ReplicationPeerManager implements ConfigurationObserver { syncReplicationPeerLock.release(); } + public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() { + return replicationLogCleanerBarrier; + } + @Override public void onConfigurationChange(Configuration conf) { this.conf = conf; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6daae10b726..1bdf6a225c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1733,7 +1733,7 @@ public class HRegionServer extends HBaseServerBase * be hooked up to WAL. */ private void setupWALAndReplication() throws IOException { - WALFactory factory = new WALFactory(conf, serverName.toString(), this, true); + WALFactory factory = new WALFactory(conf, serverName, this, true); // TODO Replication make assumptions here based on the default filesystem impl Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); String logName = AbstractFSWALProvider.getWALDirectoryName(this.serverName.toString()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java new file mode 100644 index 00000000000..052c5542d47 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationOffsetUtil.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public final class ReplicationOffsetUtil { + + private ReplicationOffsetUtil() { + } + + public static boolean shouldReplicate(ReplicationGroupOffset offset, String wal) { + // if no offset or the offset is just a place marker, replicate + if (offset == null || offset == ReplicationGroupOffset.BEGIN) { + return true; + } + // otherwise, compare the timestamp + long walTs = AbstractFSWALProvider.getTimestamp(wal); + long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal()); + if (walTs < startWalTs) { + return false; + } else if (walTs > startWalTs) { + return true; + } + // if the timestamp equals, usually it means we should include this wal but there is a special + // case, a negative offset means the wal has already been fully replicated, so here we should + // check the offset. + return offset.getOffset() >= 0; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 7135ca9a9b2..f1fd8f8d6b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -17,18 +17,29 @@ */ package org.apache.hadoop.hbase.replication.master; -import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.conf.Configuration; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,35 +51,129 @@ import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; /** * Implementation of a log cleaner that checks if a log is still scheduled for replication before * deleting it when its TTL is over. + *

+ * The logic is a bit complicated after we switch to use table based replication queue storage, see + * the design doc in HBASE-27109 and the comments in HBASE-27214 for more details. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class); - private ZKWatcher zkw = null; - private boolean shareZK = false; - private ReplicationQueueStorage queueStorage; + private Set notFullyDeadServers; + private Set peerIds; + // ServerName -> PeerId -> WalGroup -> Offset + // Here the server name is the source server name, so we can make sure that there is only one + // queue for a given peer, that why we can use a String peerId as key instead of + // ReplicationQueueId. + private Map>> replicationOffsets; + private ReplicationPeerManager rpm; + private Supplier> getNotFullyDeadServers; + + private boolean canFilter; private boolean stopped = false; - private Set wals; - private long readZKTimestamp = 0; @Override public void preClean() { - readZKTimestamp = EnvironmentEdgeManager.currentTime(); - // TODO: revisit the implementation - // try { - // // The concurrently created new WALs may not be included in the return list, - // // but they won't be deleted because they're not in the checking set. - // wals = queueStorage.getAllWALs(); - // } catch (ReplicationException e) { - // LOG.warn("Failed to read zookeeper, skipping checking deletable files"); - // wals = null; - // } + if (this.getConf() == null) { + return; + } + canFilter = rpm.getReplicationLogCleanerBarrier().start(); + if (canFilter) { + notFullyDeadServers = getNotFullyDeadServers.get(); + peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId) + .collect(Collectors.toSet()); + // must get the not fully dead servers first and then get the replication queue data, in this + // way we can make sure that, we should have added the missing replication queues for the dead + // region servers recorded in the above set, otherwise the logic in the + // filterForDeadRegionServer method may lead us delete wal still in use. + List allQueueData; + try { + allQueueData = rpm.getQueueStorage().listAllQueues(); + } catch (ReplicationException e) { + LOG.error("Can not list all replication queues, give up cleaning", e); + rpm.getReplicationLogCleanerBarrier().stop(); + canFilter = false; + notFullyDeadServers = null; + peerIds = null; + return; + } + replicationOffsets = new HashMap<>(); + for (ReplicationQueueData queueData : allQueueData) { + ReplicationQueueId queueId = queueData.getId(); + ServerName serverName = queueId.getServerWALsBelongTo(); + Map> peerId2Offsets = + replicationOffsets.computeIfAbsent(serverName, k -> new HashMap<>()); + Map offsets = + peerId2Offsets.computeIfAbsent(queueId.getPeerId(), k -> new HashMap<>()); + offsets.putAll(queueData.getOffsets()); + } + } else { + LOG.info("Skip replication log cleaner because an AddPeerProcedure is running"); + } } @Override public void postClean() { - // release memory - wals = null; + if (canFilter) { + rpm.getReplicationLogCleanerBarrier().stop(); + canFilter = false; + // release memory + notFullyDeadServers = null; + peerIds = null; + replicationOffsets = null; + } + } + + private boolean shouldDelete(ReplicationGroupOffset offset, FileStatus file) { + return !ReplicationOffsetUtil.shouldReplicate(offset, file.getPath().getName()); + } + + private boolean filterForLiveRegionServer(ServerName serverName, FileStatus file) { + Map> peerId2Offsets = + replicationOffsets.get(serverName); + if (peerId2Offsets == null) { + // if there are replication queues missing, we can not delete the wal + return false; + } + for (String peerId : peerIds) { + Map offsets = peerId2Offsets.get(peerId); + // if no replication queue for a peer, we can not delete the wal + if (offsets == null) { + return false; + } + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName()); + ReplicationGroupOffset offset = offsets.get(walGroupId); + // if a replication queue still need to replicate this wal, we can not delete it + if (!shouldDelete(offset, file)) { + return false; + } + } + // if all replication queues have already finished replicating this wal, we can delete it. + return true; + } + + private boolean filterForDeadRegionServer(ServerName serverName, FileStatus file) { + Map> peerId2Offsets = + replicationOffsets.get(serverName); + if (peerId2Offsets == null) { + // no replication queue for this dead rs, we can delete all wal files for it + return true; + } + for (String peerId : peerIds) { + Map offsets = peerId2Offsets.get(peerId); + if (offsets == null) { + // for dead server, we only care about existing replication queues, as we will delete a + // queue after we finish replicating it. + continue; + } + String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(file.getPath().getName()); + ReplicationGroupOffset offset = offsets.get(walGroupId); + // if a replication queue still need to replicate this wal, we can not delete it + if (!shouldDelete(offset, file)) { + return false; + } + } + // if all replication queues have already finished replicating this wal, we can delete it. + return true; } @Override @@ -78,10 +183,12 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { if (this.getConf() == null) { return files; } - - if (wals == null) { + if (!canFilter) { + // We can not delete anything if there are AddPeerProcedure running at the same time + // See HBASE-27214 for more details. return Collections.emptyList(); } + return Iterables.filter(files, new Predicate() { @Override public boolean apply(FileStatus file) { @@ -90,65 +197,56 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { if (file == null) { return false; } - String wal = file.getPath().getName(); - boolean logInReplicationQueue = wals.contains(wal); - if (logInReplicationQueue) { - LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); + if (peerIds.isEmpty()) { + // no peer, can always delete + return true; + } + // not a valid wal file name, delete + if (!AbstractFSWALProvider.validateWALFilename(file.getPath().getName())) { + return true; + } + // meta wal is always deletable as we will never replicate it + if (AbstractFSWALProvider.isMetaFile(file.getPath())) { + return true; + } + ServerName serverName = + AbstractFSWALProvider.parseServerNameFromWALName(file.getPath().getName()); + if (notFullyDeadServers.contains(serverName)) { + return filterForLiveRegionServer(serverName, file); + } else { + return filterForDeadRegionServer(serverName, file); } - return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); } }); } + private Set getNotFullyDeadServers(MasterServices services) { + List onlineServers = services.getServerManager().getOnlineServersList(); + return Stream.concat(onlineServers.stream(), + services.getMasterProcedureExecutor().getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).filter(p -> !p.isFinished()) + .map(p -> ((ServerCrashProcedure) p).getServerName())) + .collect(Collectors.toSet()); + } + @Override public void init(Map params) { super.init(params); - try { - if (MapUtils.isNotEmpty(params)) { - Object master = params.get(HMaster.MASTER); - if (master != null && master instanceof HMaster) { - zkw = ((HMaster) master).getZooKeeper(); - shareZK = true; - } + if (MapUtils.isNotEmpty(params)) { + Object master = params.get(HMaster.MASTER); + if (master != null && master instanceof MasterServices) { + MasterServices m = (MasterServices) master; + rpm = m.getReplicationPeerManager(); + getNotFullyDeadServers = () -> getNotFullyDeadServers(m); + return; } - if (zkw == null) { - zkw = new ZKWatcher(getConf(), "replicationLogCleaner", null); - } - // TODO: revisit the implementation - // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - } catch (IOException e) { - LOG.error("Error while configuring " + this.getClass().getName(), e); } - } - - @InterfaceAudience.Private - public void setConf(Configuration conf, ZKWatcher zk) { - super.setConf(conf); - try { - this.zkw = zk; - // TODO: revisit the implementation - // this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf); - } catch (Exception e) { - LOG.error("Error while configuring " + this.getClass().getName(), e); - } - } - - @InterfaceAudience.Private - public void setConf(Configuration conf, ZKWatcher zk, - ReplicationQueueStorage replicationQueueStorage) { - super.setConf(conf); - this.zkw = zk; - this.queueStorage = replicationQueueStorage; + throw new IllegalArgumentException("Missing " + HMaster.MASTER + " parameter"); } @Override public void stop(String why) { - if (this.stopped) return; this.stopped = true; - if (!shareZK && this.zkw != null) { - LOG.info("Stopping " + this.zkw); - this.zkw.close(); - } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java new file mode 100644 index 00000000000..d8756518728 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleanerBarrier.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A barrier to guard the execution of {@link ReplicationLogCleaner}. + *

+ * The reason why we introduce this class is because there could be race between + * {@link org.apache.hadoop.hbase.master.replication.AddPeerProcedure} and + * {@link ReplicationLogCleaner}. See HBASE-27214 for more details. + */ +@InterfaceAudience.Private +public class ReplicationLogCleanerBarrier { + + private enum State { + // the cleaner is not running + NOT_RUNNING, + // the cleaner is running + RUNNING, + // the cleaner is disabled + DISABLED + } + + private State state = State.NOT_RUNNING; + + // we could have multiple AddPeerProcedure running at the same time, so here we need to do + // reference counting. + private int numberDisabled = 0; + + public synchronized boolean start() { + if (state == State.NOT_RUNNING) { + state = State.RUNNING; + return true; + } + if (state == State.DISABLED) { + return false; + } + throw new IllegalStateException("Unexpected state " + state); + } + + public synchronized void stop() { + if (state != State.RUNNING) { + throw new IllegalStateException("Unexpected state " + state); + } + state = State.NOT_RUNNING; + } + + public synchronized boolean disable() { + if (state == State.RUNNING) { + return false; + } + if (state == State.NOT_RUNNING) { + state = State.DISABLED; + } + numberDisabled++; + return true; + } + + public synchronized void enable() { + if (state != State.DISABLED) { + throw new IllegalStateException("Unexpected state " + state); + } + numberDisabled--; + if (numberDisabled == 0) { + state = State.NOT_RUNNING; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 5d77600a187..b521766ae3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil; import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; @@ -809,22 +810,7 @@ public class ReplicationSourceManager { if (AbstractFSWALProvider.isMetaFile(wal)) { return false; } - // if no offset or the offset is just a place marker, replicate - if (offset == null || offset == ReplicationGroupOffset.BEGIN) { - return true; - } - // otherwise, compare the timestamp - long walTs = AbstractFSWALProvider.getTimestamp(wal); - long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal()); - if (walTs < startWalTs) { - return false; - } else if (walTs > startWalTs) { - return true; - } - // if the timestamp equals, usually it means we should include this wal but there is a special - // case, a negative offset means the wal has already been fully replicated, so here we should - // check the offset. - return offset.getOffset() >= 0; + return ReplicationOffsetUtil.shouldReplicate(offset, wal); } void claimQueue(ReplicationQueueId queueId) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java index 50ffd6df1af..b63ad473719 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java @@ -117,7 +117,10 @@ public class ReplicationSyncUp extends Configured implements Tool { System.out.println("Start Replication Server start"); Replication replication = new Replication(); replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, - new WALFactory(conf, "test", null, false)); + new WALFactory(conf, + ServerName + .valueOf(getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()), + null, false)); ReplicationSourceManager manager = replication.getReplicationManager(); manager.init(); claimReplicationQueues(zkw, manager); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index db39a8ba023..48086694999 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.wal; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; @@ -582,4 +586,29 @@ public abstract class AbstractFSWALProvider> implemen public static String getWALPrefixFromWALName(String name) { return getWALNameGroupFromWALName(name, 1); } + + private static final Pattern SERVER_NAME_PATTERN = Pattern.compile("^[^" + + ServerName.SERVERNAME_SEPARATOR + "]+" + ServerName.SERVERNAME_SEPARATOR + + Addressing.VALID_PORT_REGEX + ServerName.SERVERNAME_SEPARATOR + Addressing.VALID_PORT_REGEX); + + /** + * Parse the server name from wal prefix. A wal's name is always started with a server name in non + * test code. + * @throws IllegalArgumentException if the name passed in is not started with a server name + * @return the server name + */ + public static ServerName parseServerNameFromWALName(String name) { + String decoded; + try { + decoded = URLDecoder.decode(name, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new AssertionError("should never happen", e); + } + Matcher matcher = SERVER_NAME_PATTERN.matcher(decoded); + if (matcher.find()) { + return ServerName.valueOf(matcher.group()); + } else { + throw new IllegalArgumentException(name + " is not started with a server name"); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 92d96c5e210..bc0a9eec73a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; @@ -191,17 +192,35 @@ public class WALFactory { } /** - * @param conf must not be null, will keep a reference to read params in later reader/writer - * instances. - * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations - * to make a directory + * Create a WALFactory. */ + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*|.*/HBaseTestingUtility.java") public WALFactory(Configuration conf, String factoryId) throws IOException { // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider // for HMaster or HRegionServer which take system table only. See HBASE-19999 this(conf, factoryId, null, true); } + /** + * Create a WALFactory. + *

+ * This is the constructor you should use when creating a WALFactory in normal code, to make sure + * that the {@code factoryId} is the server name. We need this assumption in some places for + * parsing the server name out from the wal file name. + * @param conf must not be null, will keep a reference to read params + * in later reader/writer instances. + * @param serverName use to generate the factoryId, which will be append at + * the first of the final file name + * @param abortable the server associated with this WAL file + * @param enableSyncReplicationWALProvider whether wrap the wal provider to a + * {@link SyncReplicationWALProvider} n + */ + public WALFactory(Configuration conf, ServerName serverName, Abortable abortable, + boolean enableSyncReplicationWALProvider) throws IOException { + this(conf, serverName.toString(), abortable, enableSyncReplicationWALProvider); + } + /** * @param conf must not be null, will keep a reference to read params * in later reader/writer instances. @@ -211,7 +230,7 @@ public class WALFactory { * @param enableSyncReplicationWALProvider whether wrap the wal provider to a * {@link SyncReplicationWALProvider} */ - public WALFactory(Configuration conf, String factoryId, Abortable abortable, + private WALFactory(Configuration conf, String factoryId, Abortable abortable, boolean enableSyncReplicationWALProvider) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 1a0537bcbaf..d7ba6c227c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -18,57 +18,60 @@ package org.apache.hadoop.hbase.master.cleaner; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; -import java.util.List; +import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MockServer; -import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.zookeeper.KeeperException; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// revisit later after we implement new replication log cleaner -@Ignore +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + @Category({ MasterTests.class, MediumTests.class }) public class TestLogsCleaner { @@ -88,22 +91,29 @@ public class TestLogsCleaner { private static DirScanPool POOL; + private static String peerId = "1"; + + private MasterServices masterServices; + + private ReplicationQueueStorage queueStorage; + + @Rule + public final TableNameTestRule tableNameRule = new TableNameTestRule(); + @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL.startMiniZKCluster(); - TEST_UTIL.startMiniDFSCluster(1); + TEST_UTIL.startMiniCluster(); POOL = DirScanPool.getLogCleanerScanPool(TEST_UTIL.getConfiguration()); } @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniZKCluster(); - TEST_UTIL.shutdownMiniDFSCluster(); + TEST_UTIL.shutdownMiniCluster(); POOL.shutdownNow(); } @Before - public void beforeTest() throws IOException { + public void beforeTest() throws Exception { conf = TEST_UTIL.getConfiguration(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); @@ -112,14 +122,51 @@ public class TestLogsCleaner { // root directory fs.mkdirs(OLD_WALS_DIR); + + TableName tableName = tableNameRule.getTableName(); + TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); + TEST_UTIL.getAdmin().createTable(td); + TEST_UTIL.waitTableAvailable(tableName); + queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(TEST_UTIL.getConnection(), tableName); + + masterServices = mock(MasterServices.class); + when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection()); + ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); + when(masterServices.getReplicationPeerManager()).thenReturn(rpm); + when(rpm.getQueueStorage()).thenReturn(queueStorage); + when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); + when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); + ServerManager sm = mock(ServerManager.class); + when(masterServices.getServerManager()).thenReturn(sm); + when(sm.getOnlineServersList()).thenReturn(Collections.emptyList()); + @SuppressWarnings("unchecked") + ProcedureExecutor procExec = mock(ProcedureExecutor.class); + when(masterServices.getMasterProcedureExecutor()).thenReturn(procExec); + when(procExec.getProcedures()).thenReturn(Collections.emptyList()); } /** * This tests verifies LogCleaner works correctly with WALs and Procedure WALs located in the same - * oldWALs directory. Created files: - 2 invalid files - 5 old Procedure WALs - 30 old WALs from - * which 3 are in replication - 5 recent Procedure WALs - 1 recent WAL - 1 very new WAL (timestamp - * in future) - masterProcedureWALs subdirectory Files which should stay: - 3 replication WALs - 2 - * new WALs - 5 latest Procedure WALs - masterProcedureWALs subdirectory + * oldWALs directory. + *

+ * Created files: + *

+ * Files which should stay: + * */ @Test public void testLogCleaning() throws Exception { @@ -131,9 +178,6 @@ public class TestLogsCleaner { HMaster.decorateMasterConfiguration(conf); Server server = new DummyServer(); - ReplicationQueueStorage queueStorage = ReplicationStorageFactory - .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf); - String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), StandardCharsets.UTF_8.name()); @@ -159,14 +203,12 @@ public class TestLogsCleaner { for (int i = 1; i <= 30; i++) { Path fileName = new Path(OLD_WALS_DIR, fakeMachineName + "." + (now - i)); fs.createNewFile(fileName); - // Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these - // files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner - if (i % (30 / 3) == 0) { - // queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName()); - LOG.info("Replication log file: " + fileName); - } } - + // Case 4: the newest 3 WALs will be kept because they are beyond the replication offset + masterServices.getReplicationPeerManager().listPeers(null) + .add(new ReplicationPeerDescription(peerId, true, null, null)); + queueStorage.setOffset(new ReplicationQueueId(server.getServerName(), peerId), fakeMachineName, + new ReplicationGroupOffset(fakeMachineName + "." + (now - 3), 0), Collections.emptyMap()); // Case 5: 5 Procedure WALs that are new, will stay for (int i = 6; i <= 10; i++) { Path fileName = new Path(OLD_PROCEDURE_WALS_DIR, String.format("pv2-%020d.log", i)); @@ -189,7 +231,8 @@ public class TestLogsCleaner { // 10 procedure WALs assertEquals(10, fs.listStatus(OLD_PROCEDURE_WALS_DIR).length); - LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, null); + LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, OLD_WALS_DIR, POOL, + ImmutableMap.of(HMaster.MASTER, masterServices)); cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which @@ -208,98 +251,14 @@ public class TestLogsCleaner { } } - @Test - public void testZooKeeperRecoveryDuringGetListOfReplicators() throws Exception { - ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - - List dummyFiles = Arrays.asList( - new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log1")), - new FileStatus(100, false, 3, 100, EnvironmentEdgeManager.currentTime(), new Path("log2"))); - - FaultyZooKeeperWatcher faultyZK = - new FaultyZooKeeperWatcher(conf, "testZooKeeperAbort-faulty", null); - final AtomicBoolean getListOfReplicatorsFailed = new AtomicBoolean(false); - - try { - faultyZK.init(false); - ReplicationQueueStorage queueStorage = spy(ReplicationStorageFactory - .getReplicationQueueStorage(ConnectionFactory.createConnection(conf), conf)); - // doAnswer(new Answer() { - // @Override - // public Object answer(InvocationOnMock invocation) throws Throwable { - // try { - // return invocation.callRealMethod(); - // } catch (ReplicationException e) { - // LOG.debug("Caught Exception", e); - // getListOfReplicatorsFailed.set(true); - // throw e; - // } - // } - // }).when(queueStorage).getAllWALs(); - - cleaner.setConf(conf, faultyZK, queueStorage); - // should keep all files due to a ConnectionLossException getting the queues znodes - cleaner.preClean(); - Iterable toDelete = cleaner.getDeletableFiles(dummyFiles); - - assertTrue(getListOfReplicatorsFailed.get()); - assertFalse(toDelete.iterator().hasNext()); - assertFalse(cleaner.isStopped()); - - // zk recovery. - faultyZK.init(true); - cleaner.preClean(); - Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); - Iterator iter = filesToDelete.iterator(); - assertTrue(iter.hasNext()); - assertEquals(new Path("log1"), iter.next().getPath()); - assertTrue(iter.hasNext()); - assertEquals(new Path("log2"), iter.next().getPath()); - assertFalse(iter.hasNext()); - - } finally { - faultyZK.close(); - } - } - - /** - * When zk is working both files should be returned - * @throws Exception from ZK watcher - */ - @Test - public void testZooKeeperNormal() throws Exception { - ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - - // Subtract 1000 from current time so modtime is for sure older - // than 'now'. - long modTime = EnvironmentEdgeManager.currentTime() - 1000; - List dummyFiles = - Arrays.asList(new FileStatus(100, false, 3, 100, modTime, new Path("log1")), - new FileStatus(100, false, 3, 100, modTime, new Path("log2"))); - - ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); - try { - cleaner.setConf(conf, zkw); - cleaner.preClean(); - Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); - Iterator iter = filesToDelete.iterator(); - assertTrue(iter.hasNext()); - assertEquals(new Path("log1"), iter.next().getPath()); - assertTrue(iter.hasNext()); - assertEquals(new Path("log2"), iter.next().getPath()); - assertFalse(iter.hasNext()); - } finally { - zkw.close(); - } - } - @Test public void testOnConfigurationChange() throws Exception { // Prepare environments Server server = new DummyServer(); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); - LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, null); + LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, OLD_WALS_DIR, POOL, + ImmutableMap.of(HMaster.MASTER, masterServices)); int size = cleaner.getSizeOfCleaners(); assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); @@ -338,7 +297,7 @@ public class TestLogsCleaner { } } - static class DummyServer extends MockServer { + private static final class DummyServer extends MockServer { @Override public Configuration getConfiguration() { @@ -355,26 +314,4 @@ public class TestLogsCleaner { return null; } } - - static class FaultyZooKeeperWatcher extends ZKWatcher { - private RecoverableZooKeeper zk; - - public FaultyZooKeeperWatcher(Configuration conf, String identifier, Abortable abortable) - throws ZooKeeperConnectionException, IOException { - super(conf, identifier, abortable); - } - - public void init(boolean autoRecovery) throws Exception { - this.zk = spy(super.getRecoverableZooKeeper()); - if (!autoRecovery) { - doThrow(new KeeperException.ConnectionLossException()).when(zk) - .getChildren("/hbase/replication/rs", null); - } - } - - @Override - public RecoverableZooKeeper getRecoverableZooKeeper() { - return zk; - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 2409b081cce..5aef1eaf1c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -26,6 +26,7 @@ import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -34,7 +35,9 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; @@ -48,19 +51,19 @@ import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO: revisit later -@Ignore +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + @Category({ MasterTests.class, SmallTests.class }) public class TestReplicationHFileCleaner { @@ -71,19 +74,25 @@ public class TestReplicationHFileCleaner { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class); private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); private static Server server; + private static final TableName tableName = TableName.valueOf("test_cleaner"); private static ReplicationQueueStorage rq; private static ReplicationPeers rp; private static final String peerId = "TestReplicationHFileCleaner"; private static Configuration conf = TEST_UTIL.getConfiguration(); - static FileSystem fs = null; - Path root; + private static FileSystem fs = null; + private static Map params; + private Path root; @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniCluster(); server = new DummyServer(); + params = ImmutableMap.of(HMaster.MASTER, server); conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); HMaster.decorateMasterConfiguration(conf); + TableDescriptor td = ReplicationStorageFactory.createReplicationQueueTableDescriptor(tableName); + TEST_UTIL.getAdmin().createTable(td); + conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME, tableName.getNameAsString()); rp = ReplicationFactory.getReplicationPeers(server.getFileSystem(), server.getZooKeeper(), conf); rp.init(); @@ -93,7 +102,7 @@ public class TestReplicationHFileCleaner { @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniZKCluster(); + TEST_UTIL.shutdownMiniCluster(); } @Before @@ -116,6 +125,13 @@ public class TestReplicationHFileCleaner { rp.getPeerStorage().removePeer(peerId); } + private ReplicationHFileCleaner createCleaner() { + ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); + cleaner.setConf(conf); + cleaner.init(params); + return cleaner; + } + @Test public void testIsFileDeletable() throws IOException, ReplicationException { // 1. Create a file @@ -123,8 +139,7 @@ public class TestReplicationHFileCleaner { fs.createNewFile(file); // 2. Assert file is successfully created assertTrue("Test file not created!", fs.exists(file)); - ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); - cleaner.setConf(conf); + ReplicationHFileCleaner cleaner = createCleaner(); // 3. Assert that file as is should be deletable assertTrue("Cleaner should allow to delete this file as there is no hfile reference node " + "for it in the queue.", cleaner.isFileDeletable(fs.getFileStatus(file))); @@ -161,8 +176,7 @@ public class TestReplicationHFileCleaner { // 2. Add one file to hfile-refs queue rq.addHFileRefs(peerId, hfiles); - ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); - cleaner.setConf(conf); + ReplicationHFileCleaner cleaner = createCleaner(); Iterator deletableFilesIterator = cleaner.getDeletableFiles(files).iterator(); int i = 0; while (deletableFilesIterator.hasNext() && i < 2) { @@ -183,6 +197,15 @@ public class TestReplicationHFileCleaner { return TEST_UTIL.getConfiguration(); } + @Override + public ZKWatcher getZooKeeper() { + try { + return TEST_UTIL.getZooKeeperWatcher(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public Connection getConnection() { try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java new file mode 100644 index 00000000000..f54a4958374 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationOffsetUtil.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.replication.ReplicationOffsetUtil.shouldReplicate; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, SmallTests.class }) +public class TestReplicationOffsetUtil { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationOffsetUtil.class); + + @Test + public void test() { + assertTrue(shouldReplicate(null, "whatever")); + assertTrue(shouldReplicate(ReplicationGroupOffset.BEGIN, "whatever")); + ServerName sn = ServerName.valueOf("host", 16010, EnvironmentEdgeManager.currentTime()); + ReplicationGroupOffset offset = new ReplicationGroupOffset(sn + ".12345", 100); + assertTrue(shouldReplicate(offset, sn + ".12346")); + assertFalse(shouldReplicate(offset, sn + ".12344")); + assertTrue(shouldReplicate(offset, sn + ".12345")); + // -1 means finish replication, so should not replicate + assertFalse(shouldReplicate(new ReplicationGroupOffset(sn + ".12345", -1), sn + ".12345")); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java new file mode 100644 index 00000000000..06cb85523d3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestLogCleanerBarrier.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestLogCleanerBarrier { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLogCleanerBarrier.class); + + @Test + public void test() { + ReplicationLogCleanerBarrier barrier = new ReplicationLogCleanerBarrier(); + assertThrows(IllegalStateException.class, () -> barrier.stop()); + assertThrows(IllegalStateException.class, () -> barrier.enable()); + assertTrue(barrier.start()); + assertThrows(IllegalStateException.class, () -> barrier.start()); + assertThrows(IllegalStateException.class, () -> barrier.enable()); + assertFalse(barrier.disable()); + assertThrows(IllegalStateException.class, () -> barrier.enable()); + barrier.stop(); + + for (int i = 0; i < 3; i++) { + assertTrue(barrier.disable()); + assertFalse(barrier.start()); + } + for (int i = 0; i < 3; i++) { + assertFalse(barrier.start()); + barrier.enable(); + } + assertTrue(barrier.start()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java new file mode 100644 index 00000000000..7a227fb0603 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyIterable; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.replication.ReplicationQueueData; +import org.apache.hadoop.hbase.replication.ReplicationQueueId; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +@Category({ MasterTests.class, SmallTests.class }) +public class TestReplicationLogCleaner { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationLogCleaner.class); + + private static final Configuration CONF = HBaseConfiguration.create(); + + private MasterServices services; + + private ReplicationLogCleaner cleaner; + + @Before + public void setUp() throws ReplicationException { + services = mock(MasterServices.class); + ReplicationPeerManager rpm = mock(ReplicationPeerManager.class); + when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier()); + when(services.getReplicationPeerManager()).thenReturn(rpm); + when(rpm.listPeers(null)).thenReturn(new ArrayList<>()); + ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class); + when(rpm.getQueueStorage()).thenReturn(rqs); + when(rqs.listAllQueues()).thenReturn(new ArrayList<>()); + ServerManager sm = mock(ServerManager.class); + when(services.getServerManager()).thenReturn(sm); + when(sm.getOnlineServersList()).thenReturn(new ArrayList<>()); + @SuppressWarnings("unchecked") + ProcedureExecutor procExec = mock(ProcedureExecutor.class); + when(services.getMasterProcedureExecutor()).thenReturn(procExec); + when(procExec.getProcedures()).thenReturn(new ArrayList<>()); + + cleaner = new ReplicationLogCleaner(); + cleaner.setConf(CONF); + Map params = ImmutableMap.of(HMaster.MASTER, services); + cleaner.init(params); + } + + @After + public void tearDown() { + cleaner.postClean(); + } + + private static Iterable runCleaner(ReplicationLogCleaner cleaner, + Iterable files) { + cleaner.preClean(); + return cleaner.getDeletableFiles(files); + } + + private static FileStatus createFileStatus(Path path) { + return new FileStatus(100, false, 3, 256, EnvironmentEdgeManager.currentTime(), path); + } + + private static FileStatus createFileStatus(ServerName sn, int number) { + Path path = new Path(sn.toString() + "." + number); + return createFileStatus(path); + } + + private static ReplicationPeerDescription createPeer(String peerId) { + return new ReplicationPeerDescription(peerId, true, null, null); + } + + private void addServer(ServerName serverName) { + services.getServerManager().getOnlineServersList().add(serverName); + } + + private void addSCP(ServerName serverName, boolean finished) { + ServerCrashProcedure scp = mock(ServerCrashProcedure.class); + when(scp.getServerName()).thenReturn(serverName); + when(scp.isFinished()).thenReturn(finished); + services.getMasterProcedureExecutor().getProcedures().add(scp); + } + + private void addPeer(String... peerIds) { + services.getReplicationPeerManager().listPeers(null).addAll( + Stream.of(peerIds).map(TestReplicationLogCleaner::createPeer).collect(Collectors.toList())); + } + + private void addQueueData(ReplicationQueueData... datas) throws ReplicationException { + services.getReplicationPeerManager().getQueueStorage().listAllQueues() + .addAll(Arrays.asList(datas)); + } + + @Test + public void testNoConf() { + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); + List files = Arrays.asList(new FileStatus()); + assertSame(files, runCleaner(cleaner, files)); + cleaner.postClean(); + } + + @Test + public void testCanNotFilter() { + assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()); + List files = Arrays.asList(new FileStatus()); + assertSame(Collections.emptyList(), runCleaner(cleaner, files)); + } + + @Test + public void testNoPeer() { + Path path = new Path("/wal." + EnvironmentEdgeManager.currentTime()); + assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); + FileStatus file = createFileStatus(path); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testNotValidWalFile() { + addPeer("1"); + Path path = new Path("/whatever"); + assertFalse(AbstractFSWALProvider.validateWALFilename(path.getName())); + FileStatus file = createFileStatus(path); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testMetaWalFile() { + addPeer("1"); + Path path = new Path( + "/wal." + EnvironmentEdgeManager.currentTime() + AbstractFSWALProvider.META_WAL_PROVIDER_ID); + assertTrue(AbstractFSWALProvider.validateWALFilename(path.getName())); + assertTrue(AbstractFSWALProvider.isMetaFile(path)); + FileStatus file = createFileStatus(path); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testLiveRegionServerNoQueues() { + addPeer("1"); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addServer(sn); + List files = Arrays.asList(createFileStatus(sn, 1)); + assertThat(runCleaner(cleaner, files), emptyIterable()); + } + + @Test + public void testLiveRegionServerWithSCPNoQueues() { + addPeer("1"); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addSCP(sn, false); + List files = Arrays.asList(createFileStatus(sn, 1)); + assertThat(runCleaner(cleaner, files), emptyIterable()); + } + + @Test + public void testDeadRegionServerNoQueues() { + addPeer("1"); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + FileStatus file = createFileStatus(sn, 1); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testDeadRegionServerWithSCPNoQueues() { + addPeer("1"); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addSCP(sn, true); + FileStatus file = createFileStatus(sn, 1); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testLiveRegionServerMissingQueue() throws ReplicationException { + String peerId1 = "1"; + String peerId2 = "2"; + addPeer(peerId1, peerId2); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addServer(sn); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + addQueueData(data1); + assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); + } + + @Test + public void testLiveRegionServerShouldNotDelete() throws ReplicationException { + String peerId = "1"; + addPeer(peerId); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addServer(sn); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); + addQueueData(data); + assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); + } + + @Test + public void testLiveRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { + String peerId1 = "1"; + String peerId2 = "2"; + addPeer(peerId1, peerId2); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addServer(sn); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); + addQueueData(data1, data2); + assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); + } + + @Test + public void testLiveRegionServerShouldDelete() throws ReplicationException { + String peerId = "1"; + addPeer(peerId); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addServer(sn); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testLiveRegionServerShouldDeleteTwoPeers() throws ReplicationException { + String peerId1 = "1"; + String peerId2 = "2"; + addPeer(peerId1, peerId2); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + addServer(sn); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + addQueueData(data1, data2); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testDeadRegionServerMissingQueue() throws ReplicationException { + String peerId1 = "1"; + String peerId2 = "2"; + addPeer(peerId1, peerId2); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + addQueueData(data1); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testDeadRegionServerShouldNotDelete() throws ReplicationException { + String peerId = "1"; + addPeer(peerId); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); + addQueueData(data); + assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); + } + + @Test + public void testDeadRegionServerShouldNotDeleteTwoPeers() throws ReplicationException { + String peerId1 = "1"; + String peerId2 = "2"; + addPeer(peerId1, peerId2); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), 0))); + addQueueData(data1, data2); + assertThat(runCleaner(cleaner, Arrays.asList(file)), emptyIterable()); + } + + @Test + public void testDeadRegionServerShouldDelete() throws ReplicationException { + String peerId = "1"; + addPeer(peerId); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data = new ReplicationQueueData(new ReplicationQueueId(sn, peerId), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + services.getReplicationPeerManager().getQueueStorage().listAllQueues().add(data); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } + + @Test + public void testDeadRegionServerShouldDeleteTwoPeers() throws ReplicationException { + String peerId1 = "1"; + String peerId2 = "2"; + addPeer(peerId1, peerId2); + ServerName sn = ServerName.valueOf("server,123," + EnvironmentEdgeManager.currentTime()); + FileStatus file = createFileStatus(sn, 1); + ReplicationQueueData data1 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId1), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + ReplicationQueueData data2 = new ReplicationQueueData(new ReplicationQueueId(sn, peerId2), + ImmutableMap.of(sn.toString(), new ReplicationGroupOffset(file.getPath().getName(), -1))); + addQueueData(data1, data2); + Iterator iter = runCleaner(cleaner, Arrays.asList(file)).iterator(); + assertSame(file, iter.next()); + assertFalse(iter.hasNext()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 6aba327d791..b7564ed9168 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -190,7 +190,7 @@ public class TestReplicationSourceManager { replication = new Replication(); replication.initialize(server, FS, logDir, oldLogDir, - new WALFactory(CONF, "test", null, false)); + new WALFactory(CONF, server.getServerName(), null, false)); manager = replication.getReplicationManager(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index b59ebc0d9a6..26c1152c05a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -630,7 +630,7 @@ public class TestWALFactory { assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass()); // if providers are not set and do not enable SyncReplicationWALProvider - walFactory = new WALFactory(conf, this.currentServername.toString(), null, false); + walFactory = new WALFactory(conf, this.currentServername, null, false); assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index 8273b3d6041..6a1e98d9fd5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -183,4 +184,17 @@ public class TestWALMethods { return entry; } + @Test + public void testParseServerNameFromWALName() { + assertEquals(ServerName.valueOf("abc,123,123"), + AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.1.12345.meta")); + assertEquals(ServerName.valueOf("abc,123,123"), + AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123.12345")); + assertEquals(ServerName.valueOf("abc,123,123"), + AbstractFSWALProvider.parseServerNameFromWALName("abc,123,123")); + assertThrows(IllegalArgumentException.class, + () -> AbstractFSWALProvider.parseServerNameFromWALName("test,abc,123,123.12345")); + assertThrows(IllegalArgumentException.class, + () -> AbstractFSWALProvider.parseServerNameFromWALName("abc")); + } }