HBASE-20434 Also remove remote wals when peer is in DA state

This commit is contained in:
zhangduo 2018-04-25 17:12:23 +08:00
parent b281328228
commit 2d203c4479
8 changed files with 251 additions and 66 deletions

View File

@ -191,6 +191,10 @@ public final class ReplicationUtils {
return new Path(remoteWALDir, peerId);
}
public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
/**
* Do the sleeping logic
* @param msg Why we sleep

View File

@ -211,7 +211,7 @@ public class TransitPeerSyncReplicationStateProcedure
case CREATE_DIR_FOR_REMOTE_WAL:
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = new Path(remoteWALDir, peerId);
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
try {
if (walFs.exists(remoteWALDirForPeer)) {

View File

@ -570,14 +570,17 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
/**
* <p>
* Split a path to get the start time
* </p>
* <p>
* For example: 10.20.20.171%3A60020.1277499063250
* </p>
* @param p path to split
* @return start time
*/
private static long getTS(Path p) {
int tsIndex = p.getName().lastIndexOf('.') + 1;
return Long.parseLong(p.getName().substring(tsIndex));
return AbstractFSWALProvider.getWALStartTimeFromWALName(p.getName());
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -561,20 +563,40 @@ public class ReplicationSourceManager implements ReplicationListener {
if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
cleanOldLogs(wals, log, inclusive, source);
NavigableSet<String> walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
cleanOldLogs(walsToRemove, source);
walsToRemove.clear();
}
} else {
NavigableSet<String> wals;
NavigableSet<String> walsToRemove;
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
NavigableSet<String> wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
cleanOldLogs(wals, log, inclusive, source);
wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals == null) {
return;
}
walsToRemove = wals.headSet(log, inclusive);
if (walsToRemove.isEmpty()) {
return;
}
walsToRemove = new TreeSet<>(walsToRemove);
}
// cleanOldLogs may spend some time, especially for sync replication where we may want to
// remove remote wals as the remote cluster may have already been down, so we do it outside
// the lock to avoid block preLogRoll
cleanOldLogs(walsToRemove, source);
// now let's remove the files in the set
synchronized (this.walsById) {
wals.removeAll(walsToRemove);
}
}
}
private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wals)
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
@ -594,13 +616,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive,
ReplicationSourceInterface source) {
NavigableSet<String> walSet = wals.headSet(key, inclusive);
if (walSet.isEmpty()) {
return;
}
LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
private void cleanOldLogs(NavigableSet<String> wals, ReplicationSourceInterface source) {
LOG.debug("Removing {} logs in the list: {}", wals.size(), wals);
// The intention here is that, we want to delete the remote wal files ASAP as it may effect the
// failover time if you want to transit the remote cluster from S to A. And the infinite retry
// is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
@ -608,32 +625,39 @@ public class ReplicationSourceManager implements ReplicationListener {
if (source.isSyncReplication()) {
String peerId = source.getPeerId();
String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir,
walSet);
for (int sleepMultiplier = 0;;) {
try {
removeRemoteWALs(peerId, remoteWALDir, walSet);
break;
} catch (IOException e) {
LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
peerId);
}
if (!source.isSourceActive()) {
// skip the following operations
return;
}
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
sleepMultiplier, maxRetriesMultiplier)) {
sleepMultiplier++;
// Filter out the wals need to be removed from the remote directory. Its name should be the
// special format, and also, the peer id in its name should match the peer id for the
// replication source.
List<String> remoteWals = wals.stream().filter(w -> SyncReplicationWALProvider
.getSyncReplicationPeerIdFromWALName(w).map(peerId::equals).orElse(false))
.collect(Collectors.toList());
LOG.debug("Removing {} logs from remote dir {} in the list: {}", remoteWals.size(),
remoteWALDir, remoteWals);
if (!remoteWals.isEmpty()) {
for (int sleepMultiplier = 0;;) {
try {
removeRemoteWALs(peerId, remoteWALDir, remoteWals);
break;
} catch (IOException e) {
LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
peerId);
}
if (!source.isSourceActive()) {
// skip the following operations
return;
}
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
sleepMultiplier, maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
}
}
String queueId = source.getQueueId();
for (String wal : walSet) {
for (String wal : wals) {
interruptOrAbortWhenFail(
() -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
}
walSet.clear();
}
// public because of we call it in TestReplicationEmptyWALRecovery

View File

@ -517,6 +517,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
listeners.add(listener);
}
private static String getWALNameGroupFromWALName(String name, int group) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
if (matcher.matches()) {
return matcher.group(group);
} else {
throw new IllegalArgumentException(name + " is not a valid wal file name");
}
}
/**
* Get prefix of the log from its name, assuming WAL name in format of
* log_prefix.filenumber.log_suffix
@ -526,11 +534,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
* @see AbstractFSWAL#getCurrentFileName()
*/
public static String getWALPrefixFromWALName(String name) {
Matcher matcher = WAL_FILE_NAME_PATTERN.matcher(name);
if (matcher.matches()) {
return matcher.group(1);
} else {
throw new IllegalArgumentException(name + " is not a valid wal file name");
}
return getWALNameGroupFromWALName(name, 1);
}
public static long getWALStartTimeFromWALName(String name) {
return Long.parseLong(getWALNameGroupFromWALName(name, 2));
}
}

View File

@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
@ -48,6 +50,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -64,7 +67,8 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
private static final String LOG_SUFFIX = ".syncrep";
@VisibleForTesting
public static final String LOG_SUFFIX = ".syncrep";
private final WALProvider provider;
@ -288,4 +292,28 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
return false;
}
}
private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");
/**
* <p>
* Returns the peer id if the wal file name is in the special group for a sync replication peer.
* </p>
* <p>
* The prefix format is &lt;factoryId&gt;-&lt;ts&gt;-&lt;peerId&gt;.
* </p>
*/
public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
if (!name.endsWith(LOG_SUFFIX)) {
// fast path to return earlier if the name is not for a sync replication peer.
return Optional.empty();
}
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
Matcher matcher = LOG_PREFIX_PATTERN.matcher(logPrefix);
if (matcher.matches()) {
return Optional.of(matcher.group(1));
} else {
return Optional.empty();
}
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationRemoveRemoteWAL.class);
private void waitUntilDeleted(Path remoteWAL) throws Exception {
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
UTIL1.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !mfs.getWALFileSystem().exists(remoteWAL);
}
@Override
public String explainFailure() throws Exception {
return remoteWAL + " has not been deleted yet";
}
});
}
@Test
public void testRemoveRemoteWAL() throws Exception {
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer(
new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
Path remoteWAL = remoteWALStatus[0].getPath();
assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
rs.getWalRoller().requestRollAll();
// The replicated wal file should be deleted finally
waitUntilDeleted(remoteWAL);
remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length);
remoteWAL = remoteWALStatus[0].getPath();
assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX));
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 100, 200);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// should still be there since the peer is disabled and we haven't replicated the data yet
assertTrue(mfs.getWALFileSystem().exists(remoteWAL));
UTIL1.getAdmin().enableReplicationPeer(PEER_ID);
waitUntilReplicationDone(UTIL2, 200);
verifyThroughRegion(UTIL2, 100, 200);
// Confirm that we will also remove the remote wal files in DA state
waitUntilDeleted(remoteWAL);
}
}

View File

@ -84,6 +84,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -592,27 +593,10 @@ public abstract class TestReplicationSourceManager {
}
}
@Test
public void testRemoveRemoteWALs() throws IOException {
// make sure that we can deal with files which does not exist
String walNameNotExists = "remoteWAL.0";
Path wal = new Path(logDir, walNameNotExists);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer);
String walName = "remoteWAL.1";
Path remoteWAL =
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close();
wal = new Path(logDir, walName);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
private ReplicationSourceInterface mockReplicationSource(String peerId) {
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getPeerId()).thenReturn(slaveId);
when(source.getQueueId()).thenReturn(slaveId);
when(source.getPeerId()).thenReturn(peerId);
when(source.getQueueId()).thenReturn(peerId);
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(true);
ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
@ -621,17 +605,51 @@ public abstract class TestReplicationSourceManager {
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getPeerConfig()).thenReturn(config);
when(source.getPeer()).thenReturn(peer);
manager.cleanOldLogs(walName, true, source);
return source;
}
assertFalse(fs.exists(remoteWAL));
@Test
public void testRemoveRemoteWALs() throws Exception {
String peerId2 = slaveId + "_2";
addPeerAndWait(peerId2,
ReplicationPeerConfig.newBuilder()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(),
true);
try {
// make sure that we can deal with files which does not exist
String walNameNotExists =
"remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX;
Path wal = new Path(logDir, walNameNotExists);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer);
String walName =
"remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX;
Path remoteWAL =
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close();
wal = new Path(logDir, walName);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
ReplicationSourceInterface source = mockReplicationSource(peerId2);
manager.cleanOldLogs(walName, true, source);
// still there if peer id does not match
assertTrue(fs.exists(remoteWAL));
source = mockReplicationSource(slaveId);
manager.cleanOldLogs(walName, true, source);
assertFalse(fs.exists(remoteWAL));
} finally {
removePeerAndWait(peerId2);
}
}
/**
* Add a peer and wait for it to initialize
* @param peerId
* @param peerConfig
* @param waitForSource Whether to wait for replication source to initialize
* @throws Exception
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {