diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index 81d91ef5c27..23ec8f8d6cb 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -488,7 +488,7 @@ enum RecoverStandbyState {
RENAME_SYNC_REPLICATION_WALS_DIR = 1;
INIT_WORKERS = 2;
DISPATCH_TASKS = 3;
- REMOVE_SYNC_REPLICATION_WALS_DIR = 4;
+ SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
}
message RecoverStandbyStateData {
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 069db7a4f04..dc4217c3d34 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -46,6 +46,16 @@ public final class ReplicationUtils {
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
+ public static final String SYNC_WAL_SUFFIX = ".syncrep";
+
+ public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
+
+ public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
+
+ // This is used for copying sync replication log from local to remote and overwrite the old one
+ // since some FileSystem implementation may not support atomic rename.
+ public static final String RENAME_WAL_SUFFIX = ".ren";
+
private ReplicationUtils() {
}
@@ -187,14 +197,26 @@ public final class ReplicationUtils {
return new Path(remoteWALDir).getFileSystem(conf);
}
- public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) {
+ public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
- public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) {
+ public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
+ public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
+ }
+
+ public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+ }
+
+ public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
+ return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
+ }
+
/**
* Do the sleeping logic
* @param msg Why we sleep
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 1645d68be67..7ffd3daeb57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -22,9 +22,9 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
index e9e3a976978..9860774970d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RecoverStandbyProcedure.java
@@ -50,7 +50,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure new ReplaySyncReplicationWALProcedure(peerId,
replaySyncReplicationWALManager.removeWALRootPath(wal)))
.toArray(ReplaySyncReplicationWALProcedure[]::new));
- setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR);
+ setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
return Flow.HAS_MORE_STATE;
- case REMOVE_SYNC_REPLICATION_WALS_DIR:
+ case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
try {
- replaySyncReplicationWALManager.removePeerReplayWALDir(peerId);
+ replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
} catch (IOException e) {
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
@@ -85,7 +85,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
throws ProcedureYieldException {
try {
- return replaySyncReplicationWALManager.getReplayWALs(peerId);
+ return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
} catch (IOException e) {
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
index 7335fe01341..254448a71ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java
@@ -67,10 +67,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
}
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
- ReplaySyncReplicationWALManager remoteWALManager =
- env.getMasterServices().getReplaySyncReplicationWALManager();
- remoteWALManager.removePeerRemoteWALs(peerId);
- remoteWALManager.removePeerReplayWALDir(peerId);
+ env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
index eac5aa4d579..348c1349d16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplaySyncReplicationWALManager.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.master.replication;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -25,31 +29,27 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
@InterfaceAudience.Private
public class ReplaySyncReplicationWALManager {
private static final Logger LOG =
LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
- private static final String REPLAY_SUFFIX = "-replay";
-
private final MasterServices services;
- private final Configuration conf;
-
private final FileSystem fs;
private final Path walRootDir;
@@ -60,69 +60,86 @@ public class ReplaySyncReplicationWALManager {
public ReplaySyncReplicationWALManager(MasterServices services) {
this.services = services;
- this.conf = services.getConfiguration();
this.fs = services.getMasterFileSystem().getWALFileSystem();
this.walRootDir = services.getMasterFileSystem().getWALRootDir();
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
}
- public Path getPeerRemoteWALDir(String peerId) {
- return new Path(this.remoteWALDir, peerId);
- }
-
- private Path getPeerReplayWALDir(String peerId) {
- return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
- }
-
public void createPeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
+ Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
throw new IOException("Unable to mkdir " + peerRemoteWALDir);
}
}
- public void renamePeerRemoteWALDir(String peerId) throws IOException {
- Path peerRemoteWALDir = getPeerRemoteWALDir(peerId);
- Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX);
- if (fs.exists(peerRemoteWALDir)) {
- if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) {
- throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to "
- + peerReplayWALDir + " for peer id=" + peerId);
+ private void rename(Path src, Path dst, String peerId) throws IOException {
+ if (fs.exists(src)) {
+ deleteDir(dst, peerId);
+ if (!fs.rename(src, dst)) {
+ throw new IOException(
+ "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
}
- LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir,
- peerId);
- } else if (!fs.exists(peerReplayWALDir)) {
- throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir "
- + peerReplayWALDir + " not exist for peer id=" + peerId);
+ LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
+ } else if (!fs.exists(dst)) {
+ throw new IOException(
+ "Want to rename from " + src + " to " + dst + ", but they both do not exist");
}
}
- public List getReplayWALs(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(peerId);
- List replayWals = new ArrayList<>();
- RemoteIterator iterator = fs.listFiles(peerReplayWALDir, false);
- while (iterator.hasNext()) {
- replayWals.add(iterator.next().getPath());
- }
- return replayWals;
+ public void renameToPeerReplayWALDir(String peerId) throws IOException {
+ rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
+ peerId);
}
- public void removePeerReplayWALDir(String peerId) throws IOException {
- Path peerReplayWALDir = getPeerReplayWALDir(peerId);
+ public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
+ rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
+ peerId);
+ }
+
+ public List getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
+ for (FileStatus status : fs.listStatus(peerReplayWALDir,
+ p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
+ Path src = status.getPath();
+ String srcName = src.getName();
+ String dstName =
+ srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
+ FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
+ }
+ List wals = new ArrayList<>();
+ for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
+ Path path = status.getPath();
+ if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
+ wals.add(path);
+ } else {
+ if (!fs.delete(path, true)) {
+ LOG.warn("Can not delete unused file: " + path);
+ }
+ }
+ }
+ return wals;
+ }
+
+ public void snapshotPeerReplayWALDir(String peerId) throws IOException {
+ Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
throw new IOException(
"Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
}
}
- public void removePeerRemoteWALs(String peerId) throws IOException {
- Path remoteWALDir = getPeerRemoteWALDir(peerId);
- if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) {
- throw new IOException(
- "Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
+ private void deleteDir(Path dir, String peerId) throws IOException {
+ if (!fs.delete(dir, true) && fs.exists(dir)) {
+ throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
}
}
+ public void removePeerRemoteWALs(String peerId) throws IOException {
+ deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
+ deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
+ }
+
public void initPeerWorkers(String peerId) {
BlockingQueue servers = new LinkedBlockingQueue<>();
services.getServerManager().getOnlineServers().keySet()
@@ -144,4 +161,9 @@ public class ReplaySyncReplicationWALManager {
// remove the "/" too.
return pathStr.substring(walRootDir.toString().length() + 1);
}
+
+ @VisibleForTesting
+ public Path getRemoteWALDir() {
+ return remoteWALDir;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
index ebe7a933d78..81ee6b623e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java
@@ -118,7 +118,7 @@ public class TransitPeerSyncReplicationStateProcedure
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
if (toState == SyncReplicationState.ACTIVE) {
Path remoteWALDirForPeer =
- ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId);
+ ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
// check whether the remote wal directory is present
if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
.exists(remoteWALDirForPeer)) {
@@ -152,7 +152,7 @@ public class TransitPeerSyncReplicationStateProcedure
throws ProcedureYieldException, IOException {
MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem();
if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 1a71cb71c38..9509ea79323 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1963,8 +1963,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
if (this.csm != null) {
// SplitLogWorker needs csm. If none, don't start this.
- this.splitLogWorker = new SplitLogWorker(this, sinkConf, this,
- this, walFactory);
+ this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
splitLogWorker.start();
} else {
LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
index 4529943b013..09ec47776fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -37,4 +38,9 @@ public interface ReplicationSourceService extends ReplicationService {
* Returns a Handler to handle peer procedures.
*/
PeerProcedureHandler getPeerProcedureHandler();
+
+ /**
+ * Return the replication peers.
+ */
+ ReplicationPeers getReplicationPeers();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
index a1c20306b38..4a9712cfba4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
@@ -23,22 +23,31 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
-import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -67,67 +76,133 @@ public class SplitLogWorker implements Runnable {
Thread worker;
// thread pool which executes recovery work
private SplitLogWorkerCoordination coordination;
- private Configuration conf;
private RegionServerServices server;
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) {
this.server = server;
- this.conf = conf;
this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
coordination.init(server, conf, splitTaskExecutor, this);
}
- public SplitLogWorker(final Server hserver, final Configuration conf,
- final RegionServerServices server, final LastSequenceId sequenceIdChecker,
- final WALFactory factory) {
- this(hserver, conf, server, new TaskExecutor() {
- @Override
- public Status exec(String filename, CancelableProgressable p) {
- Path walDir;
- FileSystem fs;
- try {
- walDir = FSUtils.getWALRootDir(conf);
- fs = walDir.getFileSystem(conf);
- } catch (IOException e) {
- LOG.warn("could not find root dir or fs", e);
- return Status.RESIGNED;
- }
- // TODO have to correctly figure out when log splitting has been
- // interrupted or has encountered a transient error and when it has
- // encountered a bad non-retry-able persistent error.
- try {
- if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)),
- fs, conf, p, sequenceIdChecker,
- server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
- return Status.PREEMPTED;
- }
- } catch (InterruptedIOException iioe) {
- LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
- return Status.RESIGNED;
- } catch (IOException e) {
- if (e instanceof FileNotFoundException) {
- // A wal file may not exist anymore. Nothing can be recovered so move on
- LOG.warn("WAL {} does not exist anymore", filename, e);
- return Status.DONE;
- }
- Throwable cause = e.getCause();
- if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException
- || cause instanceof ConnectException
- || cause instanceof SocketTimeoutException)) {
- LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
- + "resigning", e);
- return Status.RESIGNED;
- } else if (cause instanceof InterruptedException) {
- LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
- return Status.RESIGNED;
- }
- LOG.warn("log splitting of " + filename + " failed, returning error", e);
- return Status.ERR;
- }
+ public SplitLogWorker(Configuration conf, RegionServerServices server,
+ LastSequenceId sequenceIdChecker, WALFactory factory) {
+ this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory));
+ }
+
+ // returns whether we need to continue the split work
+ private static boolean processSyncReplicationWAL(String name, Configuration conf,
+ RegionServerServices server, FileSystem fs, Path walDir) throws IOException {
+ Path walFile = new Path(walDir, name);
+ String filename = walFile.getName();
+ Optional optSyncPeerId =
+ SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename);
+ if (!optSyncPeerId.isPresent()) {
+ return true;
+ }
+ String peerId = optSyncPeerId.get();
+ ReplicationPeerImpl peer =
+ server.getReplicationSourceService().getReplicationPeers().getPeer(peerId);
+ if (peer == null || !peer.getPeerConfig().isSyncReplication()) {
+ return true;
+ }
+ Pair stateAndNewState =
+ peer.getSyncReplicationStateAndNewState();
+ if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) {
+ // copy the file to remote and overwrite the previous one
+ String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
+ Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp");
+ FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation")
+ FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true,
+ FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL),
+ remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) {
+ IOUtils.copy(in, out);
+ }
+ Path toCommitRemoteWAL =
+ new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX);
+ // Some FileSystem implementations may not support atomic rename so we need to do it in two
+ // phases
+ FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL);
+ FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename));
+ } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
+ stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) ||
+ stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) {
+ // check whether we still need to process this file
+ // actually we only write wal file which name is ended with .syncrep in A state, and after
+ // transiting to a state other than A, we will reopen all the regions so the data in the wal
+ // will be flushed so the wal file will be archived soon. But it is still possible that there
+ // is a server crash when we are transiting from A to S, to simplify the logic of the transit
+ // procedure, here we will also check the remote snapshot directory in state S, so that we do
+ // not need wait until all the wal files with .syncrep suffix to be archived before finishing
+ // the procedure.
+ String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
+ Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId);
+ FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
+ if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) {
+ // the file has been replayed when the remote cluster was transited from S to DA, the
+ // content will be replicated back to us so give up split it.
+ LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " +
+ "the content will be replicated back", filename);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static Status splitLog(String name, CancelableProgressable p, Configuration conf,
+ RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
+ Path walDir;
+ FileSystem fs;
+ try {
+ walDir = FSUtils.getWALRootDir(conf);
+ fs = walDir.getFileSystem(conf);
+ } catch (IOException e) {
+ LOG.warn("could not find root dir or fs", e);
+ return Status.RESIGNED;
+ }
+ try {
+ if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) {
return Status.DONE;
}
- });
+ } catch (IOException e) {
+ LOG.warn("failed to process sync replication wal {}", name, e);
+ return Status.RESIGNED;
+ }
+ // TODO have to correctly figure out when log splitting has been
+ // interrupted or has encountered a transient error and when it has
+ // encountered a bad non-retry-able persistent error.
+ try {
+ if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf,
+ p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
+ factory)) {
+ return Status.PREEMPTED;
+ }
+ } catch (InterruptedIOException iioe) {
+ LOG.warn("log splitting of " + name + " interrupted, resigning", iioe);
+ return Status.RESIGNED;
+ } catch (IOException e) {
+ if (e instanceof FileNotFoundException) {
+ // A wal file may not exist anymore. Nothing can be recovered so move on
+ LOG.warn("WAL {} does not exist anymore", name, e);
+ return Status.DONE;
+ }
+ Throwable cause = e.getCause();
+ if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException ||
+ cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
+ LOG.warn("log replaying of " + name + " can't connect to the target regionserver, " +
+ "resigning", e);
+ return Status.RESIGNED;
+ } else if (cause instanceof InterruptedException) {
+ LOG.warn("log splitting of " + name + " interrupted, resigning", e);
+ return Status.RESIGNED;
+ }
+ LOG.warn("log splitting of " + name + " failed, returning error", e);
+ return Status.ERR;
+ }
+ return Status.DONE;
}
@Override
@@ -191,6 +266,7 @@ public class SplitLogWorker implements Runnable {
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* SplitLogManager.TaskFinisher
*/
+ @FunctionalInterface
public interface TaskExecutor {
enum Status {
DONE(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
index 8ecfede9d83..4301ae79359 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CombinedAsyncWriter.java
@@ -32,13 +32,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
* An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
*/
@InterfaceAudience.Private
-public abstract class CombinedAsyncWriter implements AsyncWriter {
+public final class CombinedAsyncWriter implements AsyncWriter {
private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
- protected final ImmutableList writers;
+ private final ImmutableList writers;
- protected CombinedAsyncWriter(ImmutableList writers) {
+ private CombinedAsyncWriter(ImmutableList writers) {
this.writers = writers;
}
@@ -66,69 +66,29 @@ public abstract class CombinedAsyncWriter implements AsyncWriter {
}
}
- protected abstract void doSync(CompletableFuture future);
-
- @Override
- public CompletableFuture sync() {
- CompletableFuture future = new CompletableFuture<>();
- doSync(future);
- return future;
- }
-
@Override
public void append(Entry entry) {
writers.forEach(w -> w.append(entry));
}
- public enum Mode {
- SEQUENTIAL, PARALLEL
+ @Override
+ public CompletableFuture sync() {
+ CompletableFuture future = new CompletableFuture<>();
+ AtomicInteger remaining = new AtomicInteger(writers.size());
+ writers.forEach(w -> w.sync().whenComplete((length, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ return;
+ }
+ if (remaining.decrementAndGet() == 0) {
+ future.complete(length);
+ }
+ }));
+ return future;
}
- public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) {
- ImmutableList ws =
- ImmutableList. builder().add(writer).add(writers).build();
- switch (mode) {
- case SEQUENTIAL:
- return new CombinedAsyncWriter(ws) {
-
- private void doSync(CompletableFuture future, Long length, int index) {
- if (index == writers.size()) {
- future.complete(length);
- return;
- }
- writers.get(index).sync().whenComplete((len, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- doSync(future, len, index + 1);
- });
- }
-
- @Override
- protected void doSync(CompletableFuture future) {
- doSync(future, null, 0);
- }
- };
- case PARALLEL:
- return new CombinedAsyncWriter(ws) {
-
- @Override
- protected void doSync(CompletableFuture future) {
- AtomicInteger remaining = new AtomicInteger(writers.size());
- writers.forEach(w -> w.sync().whenComplete((length, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- return;
- }
- if (remaining.decrementAndGet() == 0) {
- future.complete(length);
- }
- }));
- }
- };
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
+ public static CombinedAsyncWriter create(AsyncWriter writer, AsyncWriter... writers) {
+ return new CombinedAsyncWriter(
+ ImmutableList. builder().add(writer).add(writers).build());
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index a98567a05d8..3967e78b1cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -50,6 +51,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
this.remoteWalDir = remoteWalDir;
}
+ // will be overridden in testcase
+ @VisibleForTesting
+ protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
+ AsyncWriter remoteWriter) {
+ return CombinedAsyncWriter.create(remoteWriter, localWriter);
+ }
+
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
@@ -66,8 +74,7 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
closeWriter(localWriter);
}
}
- return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter,
- localWriter);
+ return createCombinedAsyncWriter(localWriter, remoteWriter);
}
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 2199415b4b8..b04f0cbc656 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -288,4 +288,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
return syncReplicationPeerInfoProvider;
}
+
+ @Override
+ public ReplicationPeers getReplicationPeers() {
+ return replicationPeers;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ebdfd1a15ab..428ec98df93 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -671,7 +671,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection wals)
throws IOException {
- Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId);
+ Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
index 75274ea09f5..170441b45c1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SyncReplicationPeerInfoProviderImpl.java
@@ -77,8 +77,7 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
return false;
}
Pair states =
- peer.getSyncReplicationStateAndNewState();
+ peer.getSyncReplicationStateAndNewState();
return checker.test(states.getFirst(), states.getSecond());
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 8a1f948f27f..5b968db188c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -843,6 +843,15 @@ public abstract class FSUtils extends CommonFSUtils {
return frags;
}
+ public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
+ if (fs.exists(dst) && !fs.delete(dst, false)) {
+ throw new IOException("Can not delete " + dst);
+ }
+ if (!fs.rename(src, dst)) {
+ throw new IOException("Can not rename from " + src + " to " + dst);
+ }
+ }
+
/**
* A {@link PathFilter} that returns only regular files.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 8e82d8b8fee..82f8a89ca4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDir
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -51,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -67,8 +70,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
+ // only for injecting errors for testcase, do not use it for other purpose.
@VisibleForTesting
- public static final String LOG_SUFFIX = ".syncrep";
+ public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
private final WALProvider provider;
@@ -126,12 +130,35 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
}
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
- return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf),
- ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
- CommonFSUtils.getWALRootDir(conf),
- ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId),
- getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
- conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass);
+ Class extends DualAsyncFSWAL> clazz =
+ conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
+ try {
+ Constructor> constructor = null;
+ for (Constructor> c : clazz.getDeclaredConstructors()) {
+ if (c.getParameterCount() > 0) {
+ constructor = c;
+ break;
+ }
+ }
+ if (constructor == null) {
+ throw new IllegalArgumentException("No valid constructor provided for class " + clazz);
+ }
+ constructor.setAccessible(true);
+ return (DualAsyncFSWAL) constructor.newInstance(
+ CommonFSUtils.getWALFileSystem(conf),
+ ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
+ CommonFSUtils.getWALRootDir(conf),
+ ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
+ getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
+ conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX,
+ eventLoopGroup, channelClass);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getTargetException();
+ Throwables.propagateIfPossible(cause, IOException.class);
+ throw new RuntimeException(cause);
+ }
}
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@@ -304,7 +331,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
*
*/
public static Optional getSyncReplicationPeerIdFromWALName(String name) {
- if (!name.endsWith(LOG_SUFFIX)) {
+ if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
// fast path to return earlier if the name is not for a sync replication peer.
return Optional.empty();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c6ffeea1ad4..64622349d5a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -1141,7 +1141,7 @@ public class TestReplicationAdmin {
LOG.info("Expected error:", e);
}
TEST_UTIL.getTestFileSystem()
- .mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND));
+ .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
index 07aa6a85f6a..f73b4f159ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCombinedAsyncWriter.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,23 +37,18 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
-@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class })
public class TestCombinedAsyncWriter {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
+ HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@@ -68,15 +61,6 @@ public class TestCombinedAsyncWriter {
@Rule
public final TestName name = new TestName();
- @Parameter
- public CombinedAsyncWriter.Mode mode;
-
- @Parameters(name = "{index}: mode={0}")
- public static List