HBASE-20370 Also remove the wal file in remote cluster when we finish replicating a file

This commit is contained in:
zhangduo 2018-04-17 09:04:56 +08:00
parent d57c80c415
commit d91784e666
8 changed files with 247 additions and 117 deletions

View File

@ -22,14 +22,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper class for replication.
@ -37,6 +40,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationUtils {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class);
public static final String REPLICATION_ATTR_NAME = "__rep__";
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
@ -176,4 +181,33 @@ public final class ReplicationUtils {
return tableCFs != null && tableCFs.containsKey(tableName);
}
}
public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
throws IOException {
return new Path(remoteWALDir).getFileSystem(conf);
}
public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepForRetries the base sleep time.
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @param maxRetriesMultiplier the max retry multiplier
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier,
int maxRetriesMultiplier) {
try {
LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
Thread.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
}

View File

@ -89,8 +89,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
// The manager of all sources to which we ping back our progress
protected ReplicationSourceManager manager;
@ -168,8 +166,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@ -177,8 +173,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId,
replicationPeer.getId(), this.currentBandwidth);
}
private void decorateConf() {
@ -215,6 +211,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
@ -274,8 +271,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
clusterId, replicationPeer, metrics, tableDescriptors, server));
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs,
replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
@ -357,8 +354,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (peerBandwidth != currentBandwidth) {
currentBandwidth = peerBandwidth;
throttler.setBandwidth((double) currentBandwidth / 10.0);
LOG.info("ReplicationSource : " + peerId
+ " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
replicationPeer.getId(), currentBandwidth);
}
}
@ -387,15 +384,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* check whether the peer is enabled or not
* @return true if the peer is enabled, otherwise false
*/
@Override
public boolean isPeerEnabled() {
return replicationPeer.isPeerEnabled();
}
private void initialize() {
int sleepMultiplier = 1;
while (this.isSourceActive()) {
@ -548,11 +536,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return this.queueId;
}
@Override
public String getPeerId() {
return this.peerId;
}
@Override
public Path getCurrentPath() {
// only for testing
@ -637,6 +620,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
return server.getServerName();
}
@Override
public ReplicationPeer getPeer() {
return replicationPeer;
}
Server getServer() {
return server;
}
@ -644,4 +632,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
}

View File

@ -104,10 +104,17 @@ public interface ReplicationSourceInterface {
/**
* Get the id that the source is replicating to.
*
* @return peer id
*/
String getPeerId();
default String getPeerId() {
return getPeer().getId();
}
/**
* Get the replication peer instance.
* @return the replication peer instance
*/
ReplicationPeer getPeer();
/**
* Get a string representation of the current statistics
@ -119,8 +126,16 @@ public interface ReplicationSourceInterface {
/**
* @return peer enabled or not
*/
boolean isPeerEnabled();
default boolean isPeerEnabled() {
return getPeer().isPeerEnabled();
}
/**
* @return whether this is sync replication peer.
*/
default boolean isSyncReplication() {
return getPeer().getPeerConfig().isSyncReplication();
}
/**
* @return active or not
*/

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@ -86,20 +88,20 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* operations.</li>
* <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
* {@link #addPeer(String)}, {@link #removePeer(String)},
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
* {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
* {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
* {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
* called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
* {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
* is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
* {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
* remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
* case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
* case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link #preLogRoll(Path)}.</li>
* <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} ,
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
* {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
* {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
* {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
@ -155,9 +157,15 @@ public class ReplicationSourceManager implements ReplicationListener {
private final boolean replicationForBulkLoadDataEnabled;
private AtomicLong totalBufferUsed = new AtomicLong();
// How long should we sleep for each retry when deleting remote wal files for sync replication
// peer.
private final long sleepForRetries;
// Maximum number of retries before taking bold actions when deleting remote wal files for sync
// replication peer.
private final int maxRetriesMultiplier;
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
@ -205,8 +213,11 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.latestPaths = new HashSet<Path>();
replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.replicationForBulkLoadDataEnabled = conf.getBoolean(
HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
}
/**
@ -513,17 +524,15 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* This method will log the current position to storage. And also clean old logs from the
* replication queue.
* @param queueId id of the replication queue
* @param queueRecovered indicates if this queue comes from another region server
* @param source the replication source
* @param entryBatch the wal entry batch we just shipped
*/
public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
WALEntryBatch entryBatch) {
String fileName = entryBatch.getLastWalPath().getName();
interruptOrAbortWhenFail(() -> this.queueStorage
.setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
entryBatch.getLastSeqIds()));
cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
}
/**
@ -531,36 +540,85 @@ public class ReplicationSourceManager implements ReplicationListener {
* file is closed and has no more entries.
* @param log Path to the log
* @param inclusive whether we should also remove the given log file
* @param queueId id of the replication queue
* @param queueRecovered Whether this is a recovered queue
* @param source the replication source
*/
@VisibleForTesting
void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
if (queueRecovered) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
if (source.isRecovered()) {
NavigableSet<String> wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
cleanOldLogs(wals, log, inclusive, queueId);
cleanOldLogs(wals, log, inclusive, source);
}
} else {
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
NavigableSet<String> wals = walsById.get(queueId).get(logPrefix);
NavigableSet<String> wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
cleanOldLogs(wals, log, inclusive, queueId);
cleanOldLogs(wals, log, inclusive, source);
}
}
}
}
private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive, String id) {
private void removeRemoteWALs(String peerId, String remoteWALDir, Set<String> wals)
throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal);
try {
if (!fs.delete(walFile, false) && fs.exists(walFile)) {
throw new IOException("Can not delete " + walFile);
}
} catch (FileNotFoundException e) {
// Just ignore since this means the file has already been deleted.
// The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
// inexistent file, so here we deal with both, i.e, check the return value of the
// FileSystem.delete, and also catch FNFE.
LOG.debug("The remote wal {} has already been deleted?", walFile, e);
}
}
}
private void cleanOldLogs(NavigableSet<String> wals, String key, boolean inclusive,
ReplicationSourceInterface source) {
NavigableSet<String> walSet = wals.headSet(key, inclusive);
if (walSet.isEmpty()) {
return;
}
LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
// The intention here is that, we want to delete the remote wal files ASAP as it may effect the
// failover time if you want to transit the remote cluster from S to A. And the infinite retry
// is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
// not contact with the HBase cluster either, so the replication will be blocked either.
if (source.isSyncReplication()) {
String peerId = source.getPeerId();
String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir,
walSet);
for (int sleepMultiplier = 0;;) {
try {
removeRemoteWALs(peerId, remoteWALDir, walSet);
break;
} catch (IOException e) {
LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
peerId);
}
if (!source.isSourceActive()) {
// skip the following operations
return;
}
if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
sleepMultiplier, maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
}
String queueId = source.getQueueId();
for (String wal : walSet) {
interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
interruptOrAbortWhenFail(
() -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
}
walSet.clear();
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
@ -92,7 +94,7 @@ public class ReplicationSourceShipper extends Thread {
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
// sleep interval as it may cause a long lag when we enable the peer.
sleepForRetries("Replication is disabled", 1);
sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
continue;
}
try {
@ -211,7 +213,8 @@ public class ReplicationSourceShipper extends Thread {
} catch (Exception ex) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
@ -250,8 +253,7 @@ public class ReplicationSourceShipper extends Thread {
// position and the file will be removed soon in cleanOldLogs.
if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
batch.getLastWalPosition() != currentPosition) {
source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
source.isRecovered(), batch);
source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.
@ -304,21 +306,4 @@ public class ReplicationSourceShipper extends Thread {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
public boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
}

View File

@ -33,11 +33,10 @@ import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
@ -118,10 +117,10 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
Path remoteWALDirPath = new Path(remoteWALDir);
FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
CommonFSUtils.getWALRootDir(conf),
ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
}

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -40,12 +39,13 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
public class ReplicationSourceDummy implements ReplicationSourceInterface {
ReplicationSourceManager manager;
String peerClusterId;
Path currentPath;
MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider;
AtomicBoolean startup = new AtomicBoolean(false);
private ReplicationSourceManager manager;
private ReplicationPeer replicationPeer;
private String peerClusterId;
private Path currentPath;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
private AtomicBoolean startup = new AtomicBoolean(false);
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
@ -56,6 +56,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
this.peerClusterId = peerClusterId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationPeer = rp;
}
@Override
@ -153,4 +154,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public ServerName getServerWALsBelongTo() {
return null;
}
@Override
public ReplicationPeer getPeer() {
return replicationPeer;
}
}

View File

@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Field;
@ -49,19 +51,19 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@ -133,9 +136,9 @@ public abstract class TestReplicationSourceManager {
protected static ZKWatcher zkw;
protected static HTableDescriptor htd;
protected static TableDescriptor htd;
protected static HRegionInfo hri;
protected static RegionInfo hri;
protected static final byte[] r1 = Bytes.toBytes("r1");
@ -156,6 +159,8 @@ public abstract class TestReplicationSourceManager {
protected static Path logDir;
protected static Path remoteLogDir;
protected static CountDownLatch latch;
protected static List<String> files = new ArrayList<>();
@ -185,10 +190,9 @@ public abstract class TestReplicationSourceManager {
ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
replication = new Replication();
replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
managerOfCluster = getManagerFromCluster();
@ -205,19 +209,16 @@ public abstract class TestReplicationSourceManager {
}
waitPeer(slaveId, manager, true);
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor(f1);
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
htd.addFamily(col);
col = new HColumnDescriptor(f2);
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
htd = TableDescriptorBuilder.newBuilder(test)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
hri = new HRegionInfo(htd.getTableName(), r1, r2);
hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
}
private static ReplicationSourceManager getManagerFromCluster() {
@ -248,6 +249,7 @@ public abstract class TestReplicationSourceManager {
private void cleanLogDir() throws IOException {
fs.delete(logDir, true);
fs.delete(oldLogDir, true);
fs.delete(remoteLogDir, true);
}
@Before
@ -286,10 +288,10 @@ public abstract class TestReplicationSourceManager {
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
final WAL wal = wals.getWAL(hri);
manager.init();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
htd.addFamily(new HColumnDescriptor(f1));
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for(byte[] fam : htd.getFamiliesKeys()) {
for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
// Testing normal log rolling every 20
@ -329,7 +331,11 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
manager.logPositionAndCleanOldLogs("1", false,
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getQueueId()).thenReturn("1");
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(false);
manager.logPositionAndCleanOldLogs(source,
new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
wal.append(hri,
@ -404,7 +410,11 @@ public abstract class TestReplicationSourceManager {
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
manager.cleanOldLogs(file2, false, id, true);
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getQueueId()).thenReturn(id);
when(source.isRecovered()).thenReturn(true);
when(source.isSyncReplication()).thenReturn(false);
manager.cleanOldLogs(file2, false, source);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
@ -488,14 +498,13 @@ public abstract class TestReplicationSourceManager {
* corresponding ReplicationSourceInterface correctly cleans up the corresponding
* replication queue and ReplicationPeer.
* See HBASE-16096.
* @throws Exception
*/
@Test
public void testPeerRemovalCleanup() throws Exception{
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer";
final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build();
try {
DummyServer server = new DummyServer();
ReplicationQueueStorage rq = ReplicationStorageFactory
@ -504,7 +513,7 @@ public abstract class TestReplicationSourceManager {
// initialization to throw an exception.
conf.set("replication.replicationsource.implementation",
FailInitializeDummyReplicationSource.class.getName());
final ReplicationPeers rp = manager.getReplicationPeers();
manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
// Don't wait for replication source to initialize, we know it won't.
addPeerAndWait(peerId, peerConfig, false);
@ -549,8 +558,8 @@ public abstract class TestReplicationSourceManager {
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build();
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
@ -582,6 +591,40 @@ public abstract class TestReplicationSourceManager {
}
}
@Test
public void testRemoveRemoteWALs() throws IOException {
// make sure that we can deal with files which does not exist
String walNameNotExists = "remoteWAL.0";
Path wal = new Path(logDir, walNameNotExists);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer);
String walName = "remoteWAL.1";
Path remoteWAL =
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close();
wal = new Path(logDir, walName);
manager.preLogRoll(wal);
manager.postLogRoll(wal);
ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
when(source.getPeerId()).thenReturn(slaveId);
when(source.getQueueId()).thenReturn(slaveId);
when(source.isRecovered()).thenReturn(false);
when(source.isSyncReplication()).thenReturn(true);
ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
when(config.getRemoteWALDir())
.thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
ReplicationPeer peer = mock(ReplicationPeer.class);
when(peer.getPeerConfig()).thenReturn(config);
when(source.getPeer()).thenReturn(peer);
manager.cleanOldLogs(walName, true, source);
assertFalse(fs.exists(remoteWAL));
}
/**
* Add a peer and wait for it to initialize
* @param peerId