diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index cb22f579788..66e9b0138a8 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -22,14 +22,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Helper class for replication.
@@ -37,6 +40,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationUtils.class);
+
public static final String REPLICATION_ATTR_NAME = "__rep__";
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
@@ -176,4 +181,33 @@ public final class ReplicationUtils {
return tableCFs != null && tableCFs.containsKey(tableName);
}
}
+
+ public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
+ throws IOException {
+ return new Path(remoteWALDir).getFileSystem(conf);
+ }
+
+ public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
+ return new Path(remoteWALDir, peerId);
+ }
+
+ /**
+ * Do the sleeping logic
+ * @param msg Why we sleep
+ * @param sleepForRetries the base sleep time.
+ * @param sleepMultiplier by how many times the default sleeping time is augmented
+ * @param maxRetriesMultiplier the max retry multiplier
+ * @return True if sleepMultiplier
is < maxRetriesMultiplier
+ */
+ public static boolean sleepForRetries(String msg, long sleepForRetries, int sleepMultiplier,
+ int maxRetriesMultiplier) {
+ try {
+ LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
+ Thread.sleep(sleepForRetries * sleepMultiplier);
+ } catch (InterruptedException e) {
+ LOG.debug("Interrupted while sleeping between retries");
+ Thread.currentThread().interrupt();
+ }
+ return sleepMultiplier < maxRetriesMultiplier;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index b63712b879b..ddb6f934802 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -89,8 +89,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected Configuration conf;
protected ReplicationQueueInfo replicationQueueInfo;
- // id of the peer cluster this source replicates to
- private String peerId;
// The manager of all sources to which we ping back our progress
protected ReplicationSourceManager manager;
@@ -168,8 +166,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
- // ReplicationQueueInfo parses the peerId out of the znode for us
- this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -177,8 +173,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
this.walFileLengthProvider = walFileLengthProvider;
- LOG.info("queueId=" + queueId + ", ReplicationSource : " + peerId
- + ", currentBandwidth=" + this.currentBandwidth);
+ LOG.info("queueId={}, ReplicationSource : {}, currentBandwidth={}", queueId,
+ replicationPeer.getId(), this.currentBandwidth);
}
private void decorateConf() {
@@ -215,6 +211,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
public void addHFileRefs(TableName tableName, byte[] family, List> pairs)
throws ReplicationException {
+ String peerId = replicationPeer.getId();
Map> tableCFMap = replicationPeer.getTableCFs();
if (tableCFMap != null) {
List tableCfs = tableCFMap.get(tableName);
@@ -274,8 +271,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
- .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
- clusterId, replicationPeer, metrics, tableDescriptors, server));
+ .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs,
+ replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
@@ -357,8 +354,8 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (peerBandwidth != currentBandwidth) {
currentBandwidth = peerBandwidth;
throttler.setBandwidth((double) currentBandwidth / 10.0);
- LOG.info("ReplicationSource : " + peerId
- + " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
+ LOG.info("ReplicationSource : {} bandwidth throttling changed, currentBandWidth={}",
+ replicationPeer.getId(), currentBandwidth);
}
}
@@ -387,15 +384,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return sleepMultiplier < maxRetriesMultiplier;
}
- /**
- * check whether the peer is enabled or not
- * @return true if the peer is enabled, otherwise false
- */
- @Override
- public boolean isPeerEnabled() {
- return replicationPeer.isPeerEnabled();
- }
-
private void initialize() {
int sleepMultiplier = 1;
while (this.isSourceActive()) {
@@ -548,11 +536,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
return this.queueId;
}
- @Override
- public String getPeerId() {
- return this.peerId;
- }
-
@Override
public Path getCurrentPath() {
// only for testing
@@ -637,6 +620,11 @@ public class ReplicationSource implements ReplicationSourceInterface {
return server.getServerName();
}
+ @Override
+ public ReplicationPeer getPeer() {
+ return replicationPeer;
+ }
+
Server getServer() {
return server;
}
@@ -644,4 +632,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
+
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 090b4651f7c..3ce5bfe1644 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -104,10 +104,17 @@ public interface ReplicationSourceInterface {
/**
* Get the id that the source is replicating to.
- *
* @return peer id
*/
- String getPeerId();
+ default String getPeerId() {
+ return getPeer().getId();
+ }
+
+ /**
+ * Get the replication peer instance.
+ * @return the replication peer instance
+ */
+ ReplicationPeer getPeer();
/**
* Get a string representation of the current statistics
@@ -119,8 +126,16 @@ public interface ReplicationSourceInterface {
/**
* @return peer enabled or not
*/
- boolean isPeerEnabled();
+ default boolean isPeerEnabled() {
+ return getPeer().isPeerEnabled();
+ }
+ /**
+ * @return whether this is sync replication peer.
+ */
+ default boolean isSyncReplication() {
+ return getPeer().getPeerConfig().isSyncReplication();
+ }
/**
* @return active or not
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 7830621e8ed..f3763ad9e3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -86,20 +88,20 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* operations.
* Need synchronized on {@link #walsById}. There are four methods which modify it,
* {@link #addPeer(String)}, {@link #removePeer(String)},
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and {@link #preLogRoll(Path)}.
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
* {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
* {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
- * {@link #removePeer(String)}. {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is
- * called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
+ * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
+ * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
* {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
* remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
- * case need synchronized is {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
+ * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link #preLogRoll(Path)}.
* No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
* modify it, {@link #removePeer(String)} ,
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} and
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
* {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
- * {@link #cleanOldLogs(NavigableSet, String, boolean, String)} is called by
+ * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
* {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
* {@link ReplicationSourceInterface} firstly, then remove the wals from
* {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
@@ -155,9 +157,15 @@ public class ReplicationSourceManager implements ReplicationListener {
private final boolean replicationForBulkLoadDataEnabled;
-
private AtomicLong totalBufferUsed = new AtomicLong();
+ // How long should we sleep for each retry when deleting remote wal files for sync replication
+ // peer.
+ private final long sleepForRetries;
+ // Maximum number of retries before taking bold actions when deleting remote wal files for sync
+ // replication peer.
+ private final int maxRetriesMultiplier;
+
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
@@ -205,8 +213,11 @@ public class ReplicationSourceManager implements ReplicationListener {
tfb.setDaemon(true);
this.executor.setThreadFactory(tfb.build());
this.latestPaths = new HashSet();
- replicationForBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
- HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ this.replicationForBulkLoadDataEnabled = conf.getBoolean(
+ HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+ this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000);
+ this.maxRetriesMultiplier =
+ this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
}
/**
@@ -513,17 +524,15 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* This method will log the current position to storage. And also clean old logs from the
* replication queue.
- * @param queueId id of the replication queue
- * @param queueRecovered indicates if this queue comes from another region server
+ * @param source the replication source
* @param entryBatch the wal entry batch we just shipped
*/
- public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
+ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
WALEntryBatch entryBatch) {
String fileName = entryBatch.getLastWalPath().getName();
- interruptOrAbortWhenFail(() -> this.queueStorage
- .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
- entryBatch.getLastSeqIds()));
- cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
+ interruptOrAbortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(),
+ source.getQueueId(), fileName, entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds()));
+ cleanOldLogs(fileName, entryBatch.isEndOfFile(), source);
}
/**
@@ -531,36 +540,85 @@ public class ReplicationSourceManager implements ReplicationListener {
* file is closed and has no more entries.
* @param log Path to the log
* @param inclusive whether we should also remove the given log file
- * @param queueId id of the replication queue
- * @param queueRecovered Whether this is a recovered queue
+ * @param source the replication source
*/
@VisibleForTesting
- void cleanOldLogs(String log, boolean inclusive, String queueId, boolean queueRecovered) {
+ void cleanOldLogs(String log, boolean inclusive, ReplicationSourceInterface source) {
String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(log);
- if (queueRecovered) {
- NavigableSet wals = walsByIdRecoveredQueues.get(queueId).get(logPrefix);
+ if (source.isRecovered()) {
+ NavigableSet wals = walsByIdRecoveredQueues.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
- cleanOldLogs(wals, log, inclusive, queueId);
+ cleanOldLogs(wals, log, inclusive, source);
}
} else {
// synchronized on walsById to avoid race with preLogRoll
synchronized (this.walsById) {
- NavigableSet wals = walsById.get(queueId).get(logPrefix);
+ NavigableSet wals = walsById.get(source.getQueueId()).get(logPrefix);
if (wals != null) {
- cleanOldLogs(wals, log, inclusive, queueId);
+ cleanOldLogs(wals, log, inclusive, source);
}
}
}
}
- private void cleanOldLogs(NavigableSet wals, String key, boolean inclusive, String id) {
+ private void removeRemoteWALs(String peerId, String remoteWALDir, Set wals)
+ throws IOException {
+ Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ for (String wal : wals) {
+ Path walFile = new Path(remoteWALDirForPeer, wal);
+ try {
+ if (!fs.delete(walFile, false) && fs.exists(walFile)) {
+ throw new IOException("Can not delete " + walFile);
+ }
+ } catch (FileNotFoundException e) {
+ // Just ignore since this means the file has already been deleted.
+ // The javadoc of the FileSystem.delete methods does not specify the behavior of deleting an
+ // inexistent file, so here we deal with both, i.e, check the return value of the
+ // FileSystem.delete, and also catch FNFE.
+ LOG.debug("The remote wal {} has already been deleted?", walFile, e);
+ }
+ }
+ }
+
+ private void cleanOldLogs(NavigableSet wals, String key, boolean inclusive,
+ ReplicationSourceInterface source) {
NavigableSet walSet = wals.headSet(key, inclusive);
if (walSet.isEmpty()) {
return;
}
LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
+ // The intention here is that, we want to delete the remote wal files ASAP as it may effect the
+ // failover time if you want to transit the remote cluster from S to A. And the infinite retry
+ // is not a problem, as if we can not contact with the remote HDFS cluster, then usually we can
+ // not contact with the HBase cluster either, so the replication will be blocked either.
+ if (source.isSyncReplication()) {
+ String peerId = source.getPeerId();
+ String remoteWALDir = source.getPeer().getPeerConfig().getRemoteWALDir();
+ LOG.debug("Removing {} logs from remote dir {} in the list: {}", walSet.size(), remoteWALDir,
+ walSet);
+ for (int sleepMultiplier = 0;;) {
+ try {
+ removeRemoteWALs(peerId, remoteWALDir, walSet);
+ break;
+ } catch (IOException e) {
+ LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir,
+ peerId);
+ }
+ if (!source.isSourceActive()) {
+ // skip the following operations
+ return;
+ }
+ if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", sleepForRetries,
+ sleepMultiplier, maxRetriesMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+ String queueId = source.getQueueId();
for (String wal : walSet) {
- interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
+ interruptOrAbortWhenFail(
+ () -> this.queueStorage.removeWAL(server.getServerName(), queueId, wal));
}
walSet.clear();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 51df46a2884..24afaddba5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
+
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
@@ -92,7 +94,7 @@ public class ReplicationSourceShipper extends Thread {
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
// sleep interval as it may cause a long lag when we enable the peer.
- sleepForRetries("Replication is disabled", 1);
+ sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
continue;
}
try {
@@ -211,7 +213,8 @@ public class ReplicationSourceShipper extends Thread {
} catch (Exception ex) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
- if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
+ if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
+ maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
@@ -250,8 +253,7 @@ public class ReplicationSourceShipper extends Thread {
// position and the file will be removed soon in cleanOldLogs.
if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
batch.getLastWalPosition() != currentPosition) {
- source.getSourceManager().logPositionAndCleanOldLogs(source.getQueueId(),
- source.isRecovered(), batch);
+ source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.
@@ -304,21 +306,4 @@ public class ReplicationSourceShipper extends Thread {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
-
- /**
- * Do the sleeping logic
- * @param msg Why we sleep
- * @param sleepMultiplier by how many times the default sleeping time is augmented
- * @return True if sleepMultiplier
is < maxRetriesMultiplier
- */
- public boolean sleepForRetries(String msg, int sleepMultiplier) {
- try {
- LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
- Thread.sleep(this.sleepForRetries * sleepMultiplier);
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping between retries");
- Thread.currentThread().interrupt();
- }
- return sleepMultiplier < maxRetriesMultiplier;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 9cbb0954b7e..3cd356d4214 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -33,11 +33,10 @@ import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
@@ -118,10 +117,10 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
- Path remoteWALDirPath = new Path(remoteWALDir);
- FileSystem remoteFs = remoteWALDirPath.getFileSystem(conf);
- return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), remoteFs,
- CommonFSUtils.getWALRootDir(conf), new Path(remoteWALDirPath, peerId),
+ return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
+ ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
+ CommonFSUtils.getWALRootDir(conf),
+ ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index ec6ec96e1a5..67f793d628c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,12 +39,13 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
*/
public class ReplicationSourceDummy implements ReplicationSourceInterface {
- ReplicationSourceManager manager;
- String peerClusterId;
- Path currentPath;
- MetricsSource metrics;
- WALFileLengthProvider walFileLengthProvider;
- AtomicBoolean startup = new AtomicBoolean(false);
+ private ReplicationSourceManager manager;
+ private ReplicationPeer replicationPeer;
+ private String peerClusterId;
+ private Path currentPath;
+ private MetricsSource metrics;
+ private WALFileLengthProvider walFileLengthProvider;
+ private AtomicBoolean startup = new AtomicBoolean(false);
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
@@ -56,6 +56,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
this.peerClusterId = peerClusterId;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
+ this.replicationPeer = rp;
}
@Override
@@ -153,4 +154,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public ServerName getServerWALsBelongTo() {
return null;
}
+
+ @Override
+ public ReplicationPeer getPeer() {
+ return replicationPeer;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 482f49ad1cc..5ea3173bc48 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -49,19 +51,19 @@ import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
@@ -133,9 +136,9 @@ public abstract class TestReplicationSourceManager {
protected static ZKWatcher zkw;
- protected static HTableDescriptor htd;
+ protected static TableDescriptor htd;
- protected static HRegionInfo hri;
+ protected static RegionInfo hri;
protected static final byte[] r1 = Bytes.toBytes("r1");
@@ -156,6 +159,8 @@ public abstract class TestReplicationSourceManager {
protected static Path logDir;
+ protected static Path remoteLogDir;
+
protected static CountDownLatch latch;
protected static List files = new ArrayList<>();
@@ -185,10 +190,9 @@ public abstract class TestReplicationSourceManager {
ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
fs = FileSystem.get(conf);
- oldLogDir = new Path(utility.getDataTestDir(),
- HConstants.HREGION_OLDLOGDIR_NAME);
- logDir = new Path(utility.getDataTestDir(),
- HConstants.HREGION_LOGDIR_NAME);
+ oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
+ remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
replication = new Replication();
replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
managerOfCluster = getManagerFromCluster();
@@ -205,19 +209,16 @@ public abstract class TestReplicationSourceManager {
}
waitPeer(slaveId, manager, true);
- htd = new HTableDescriptor(test);
- HColumnDescriptor col = new HColumnDescriptor(f1);
- col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
- htd.addFamily(col);
- col = new HColumnDescriptor(f2);
- col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
- htd.addFamily(col);
+ htd = TableDescriptorBuilder.newBuilder(test)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build();
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for(byte[] fam : htd.getFamiliesKeys()) {
+ for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
- hri = new HRegionInfo(htd.getTableName(), r1, r2);
+ hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build();
}
private static ReplicationSourceManager getManagerFromCluster() {
@@ -248,6 +249,7 @@ public abstract class TestReplicationSourceManager {
private void cleanLogDir() throws IOException {
fs.delete(logDir, true);
fs.delete(oldLogDir, true);
+ fs.delete(remoteLogDir, true);
}
@Before
@@ -286,10 +288,10 @@ public abstract class TestReplicationSourceManager {
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
final WAL wal = wals.getWAL(hri);
manager.init();
- HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
- htd.addFamily(new HColumnDescriptor(f1));
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame"))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build();
NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for(byte[] fam : htd.getFamiliesKeys()) {
+ for(byte[] fam : htd.getColumnFamilyNames()) {
scopes.put(fam, 0);
}
// Testing normal log rolling every 20
@@ -329,7 +331,11 @@ public abstract class TestReplicationSourceManager {
wal.rollWriter();
- manager.logPositionAndCleanOldLogs("1", false,
+ ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
+ when(source.getQueueId()).thenReturn("1");
+ when(source.isRecovered()).thenReturn(false);
+ when(source.isSyncReplication()).thenReturn(false);
+ manager.logPositionAndCleanOldLogs(source,
new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath()));
wal.append(hri,
@@ -404,7 +410,11 @@ public abstract class TestReplicationSourceManager {
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
- manager.cleanOldLogs(file2, false, id, true);
+ ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
+ when(source.getQueueId()).thenReturn(id);
+ when(source.isRecovered()).thenReturn(true);
+ when(source.isSyncReplication()).thenReturn(false);
+ manager.cleanOldLogs(file2, false, source);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
}
@@ -488,14 +498,13 @@ public abstract class TestReplicationSourceManager {
* corresponding ReplicationSourceInterface correctly cleans up the corresponding
* replication queue and ReplicationPeer.
* See HBASE-16096.
- * @throws Exception
*/
@Test
public void testPeerRemovalCleanup() throws Exception{
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer";
- final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
- .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
+ final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build();
try {
DummyServer server = new DummyServer();
ReplicationQueueStorage rq = ReplicationStorageFactory
@@ -504,7 +513,7 @@ public abstract class TestReplicationSourceManager {
// initialization to throw an exception.
conf.set("replication.replicationsource.implementation",
FailInitializeDummyReplicationSource.class.getName());
- final ReplicationPeers rp = manager.getReplicationPeers();
+ manager.getReplicationPeers();
// Set up the znode and ReplicationPeer for the fake peer
// Don't wait for replication source to initialize, we know it won't.
addPeerAndWait(peerId, peerConfig, false);
@@ -549,8 +558,8 @@ public abstract class TestReplicationSourceManager {
@Test
public void testRemovePeerMetricsCleanup() throws Exception {
final String peerId = "DummyPeer";
- final ReplicationPeerConfig peerConfig = new ReplicationPeerConfig()
- .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
+ final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build();
try {
MetricsReplicationSourceSource globalSource = getGlobalSource();
final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue();
@@ -582,6 +591,40 @@ public abstract class TestReplicationSourceManager {
}
}
+ @Test
+ public void testRemoveRemoteWALs() throws IOException {
+ // make sure that we can deal with files which does not exist
+ String walNameNotExists = "remoteWAL.0";
+ Path wal = new Path(logDir, walNameNotExists);
+ manager.preLogRoll(wal);
+ manager.postLogRoll(wal);
+
+ Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
+ fs.mkdirs(remoteLogDirForPeer);
+ String walName = "remoteWAL.1";
+ Path remoteWAL =
+ new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ fs.create(remoteWAL).close();
+ wal = new Path(logDir, walName);
+ manager.preLogRoll(wal);
+ manager.postLogRoll(wal);
+
+ ReplicationSourceInterface source = mock(ReplicationSourceInterface.class);
+ when(source.getPeerId()).thenReturn(slaveId);
+ when(source.getQueueId()).thenReturn(slaveId);
+ when(source.isRecovered()).thenReturn(false);
+ when(source.isSyncReplication()).thenReturn(true);
+ ReplicationPeerConfig config = mock(ReplicationPeerConfig.class);
+ when(config.getRemoteWALDir())
+ .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString());
+ ReplicationPeer peer = mock(ReplicationPeer.class);
+ when(peer.getPeerConfig()).thenReturn(config);
+ when(source.getPeer()).thenReturn(peer);
+ manager.cleanOldLogs(walName, true, source);
+
+ assertFalse(fs.exists(remoteWAL));
+ }
+
/**
* Add a peer and wait for it to initialize
* @param peerId