HBASE-20424 Allow writing WAL to local and remote cluster concurrently

This commit is contained in:
zhangduo 2018-05-24 16:20:28 +08:00
parent 603110719d
commit f67763ffa0
29 changed files with 734 additions and 239 deletions

View File

@ -488,7 +488,7 @@ enum RecoverStandbyState {
RENAME_SYNC_REPLICATION_WALS_DIR = 1; RENAME_SYNC_REPLICATION_WALS_DIR = 1;
INIT_WORKERS = 2; INIT_WORKERS = 2;
DISPATCH_TASKS = 3; DISPATCH_TASKS = 3;
REMOVE_SYNC_REPLICATION_WALS_DIR = 4; SNAPSHOT_SYNC_REPLICATION_WALS_DIR = 4;
} }
message RecoverStandbyStateData { message RecoverStandbyStateData {

View File

@ -46,6 +46,16 @@ public final class ReplicationUtils {
public static final String REMOTE_WAL_DIR_NAME = "remoteWALs"; public static final String REMOTE_WAL_DIR_NAME = "remoteWALs";
public static final String SYNC_WAL_SUFFIX = ".syncrep";
public static final String REMOTE_WAL_REPLAY_SUFFIX = "-replay";
public static final String REMOTE_WAL_SNAPSHOT_SUFFIX = "-snapshot";
// This is used for copying sync replication log from local to remote and overwrite the old one
// since some FileSystem implementation may not support atomic rename.
public static final String RENAME_WAL_SUFFIX = ".ren";
private ReplicationUtils() { private ReplicationUtils() {
} }
@ -187,14 +197,26 @@ public final class ReplicationUtils {
return new Path(remoteWALDir).getFileSystem(conf); return new Path(remoteWALDir).getFileSystem(conf);
} }
public static Path getRemoteWALDirForPeer(String remoteWALDir, String peerId) { public static Path getPeerRemoteWALDir(String remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId); return new Path(remoteWALDir, peerId);
} }
public static Path getRemoteWALDirForPeer(Path remoteWALDir, String peerId) { public static Path getPeerRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId); return new Path(remoteWALDir, peerId);
} }
public static Path getPeerReplayWALDir(Path remoteWALDir, String peerId) {
return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_REPLAY_SUFFIX);
}
public static Path getPeerSnapshotWALDir(String remoteWALDir, String peerId) {
return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
}
public static Path getPeerSnapshotWALDir(Path remoteWALDir, String peerId) {
return getPeerRemoteWALDir(remoteWALDir, peerId).suffix(REMOTE_WAL_SNAPSHOT_SUFFIX);
}
/** /**
* Do the sleeping logic * Do the sleeping logic
* @param msg Why we sleep * @param msg Why we sleep

View File

@ -22,9 +22,9 @@ import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHel
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE; import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.WRITER_IDLE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor; import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;

View File

@ -50,7 +50,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
switch (state) { switch (state) {
case RENAME_SYNC_REPLICATION_WALS_DIR: case RENAME_SYNC_REPLICATION_WALS_DIR:
try { try {
replaySyncReplicationWALManager.renamePeerRemoteWALDir(peerId); replaySyncReplicationWALManager.renameToPeerReplayWALDir(peerId);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e); LOG.warn("Failed to rename remote wal dir for peer id={}", peerId, e);
setFailure("master-recover-standby", e); setFailure("master-recover-standby", e);
@ -67,11 +67,11 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
.map(wal -> new ReplaySyncReplicationWALProcedure(peerId, .map(wal -> new ReplaySyncReplicationWALProcedure(peerId,
replaySyncReplicationWALManager.removeWALRootPath(wal))) replaySyncReplicationWALManager.removeWALRootPath(wal)))
.toArray(ReplaySyncReplicationWALProcedure[]::new)); .toArray(ReplaySyncReplicationWALProcedure[]::new));
setNextState(RecoverStandbyState.REMOVE_SYNC_REPLICATION_WALS_DIR); setNextState(RecoverStandbyState.SNAPSHOT_SYNC_REPLICATION_WALS_DIR);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REMOVE_SYNC_REPLICATION_WALS_DIR: case SNAPSHOT_SYNC_REPLICATION_WALS_DIR:
try { try {
replaySyncReplicationWALManager.removePeerReplayWALDir(peerId); replaySyncReplicationWALManager.renameToPeerSnapshotWALDir(peerId);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e); LOG.warn("Failed to cleanup replay wals dir for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException(); throw new ProcedureYieldException();
@ -85,7 +85,7 @@ public class RecoverStandbyProcedure extends AbstractPeerProcedure<RecoverStandb
private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager) private List<Path> getReplayWALs(ReplaySyncReplicationWALManager replaySyncReplicationWALManager)
throws ProcedureYieldException { throws ProcedureYieldException {
try { try {
return replaySyncReplicationWALManager.getReplayWALs(peerId); return replaySyncReplicationWALManager.getReplayWALsAndCleanUpUnusedFiles(peerId);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e); LOG.warn("Failed to get replay wals for peer id={}, , retry", peerId, e);
throw new ProcedureYieldException(); throw new ProcedureYieldException();

View File

@ -67,10 +67,7 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
} }
private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
ReplaySyncReplicationWALManager remoteWALManager = env.getMasterServices().getReplaySyncReplicationWALManager().removePeerRemoteWALs(peerId);
env.getMasterServices().getReplaySyncReplicationWALManager();
remoteWALManager.removePeerRemoteWALs(peerId);
remoteWALManager.removePeerReplayWALDir(peerId);
} }
@Override @Override

View File

@ -17,6 +17,10 @@
*/ */
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerRemoteWALDir;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerReplayWALDir;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getPeerSnapshotWALDir;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -25,31 +29,27 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplaySyncReplicationWALManager { public class ReplaySyncReplicationWALManager {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class); LoggerFactory.getLogger(ReplaySyncReplicationWALManager.class);
private static final String REPLAY_SUFFIX = "-replay";
private final MasterServices services; private final MasterServices services;
private final Configuration conf;
private final FileSystem fs; private final FileSystem fs;
private final Path walRootDir; private final Path walRootDir;
@ -60,69 +60,86 @@ public class ReplaySyncReplicationWALManager {
public ReplaySyncReplicationWALManager(MasterServices services) { public ReplaySyncReplicationWALManager(MasterServices services) {
this.services = services; this.services = services;
this.conf = services.getConfiguration();
this.fs = services.getMasterFileSystem().getWALFileSystem(); this.fs = services.getMasterFileSystem().getWALFileSystem();
this.walRootDir = services.getMasterFileSystem().getWALRootDir(); this.walRootDir = services.getMasterFileSystem().getWALRootDir();
this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME); this.remoteWALDir = new Path(this.walRootDir, ReplicationUtils.REMOTE_WAL_DIR_NAME);
} }
public Path getPeerRemoteWALDir(String peerId) {
return new Path(this.remoteWALDir, peerId);
}
private Path getPeerReplayWALDir(String peerId) {
return getPeerRemoteWALDir(peerId).suffix(REPLAY_SUFFIX);
}
public void createPeerRemoteWALDir(String peerId) throws IOException { public void createPeerRemoteWALDir(String peerId) throws IOException {
Path peerRemoteWALDir = getPeerRemoteWALDir(peerId); Path peerRemoteWALDir = getPeerRemoteWALDir(remoteWALDir, peerId);
if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) { if (!fs.exists(peerRemoteWALDir) && !fs.mkdirs(peerRemoteWALDir)) {
throw new IOException("Unable to mkdir " + peerRemoteWALDir); throw new IOException("Unable to mkdir " + peerRemoteWALDir);
} }
} }
public void renamePeerRemoteWALDir(String peerId) throws IOException { private void rename(Path src, Path dst, String peerId) throws IOException {
Path peerRemoteWALDir = getPeerRemoteWALDir(peerId); if (fs.exists(src)) {
Path peerReplayWALDir = peerRemoteWALDir.suffix(REPLAY_SUFFIX); deleteDir(dst, peerId);
if (fs.exists(peerRemoteWALDir)) { if (!fs.rename(src, dst)) {
if (!fs.rename(peerRemoteWALDir, peerReplayWALDir)) { throw new IOException(
throw new IOException("Failed rename remote wal dir from " + peerRemoteWALDir + " to " "Failed to rename dir from " + src + " to " + dst + " for peer id=" + peerId);
+ peerReplayWALDir + " for peer id=" + peerId);
} }
LOG.info("Rename remote wal dir from {} to {} for peer id={}", remoteWALDir, peerReplayWALDir, LOG.info("Renamed dir from {} to {} for peer id={}", src, dst, peerId);
peerId); } else if (!fs.exists(dst)) {
} else if (!fs.exists(peerReplayWALDir)) { throw new IOException(
throw new IOException("Remote wal dir " + peerRemoteWALDir + " and replay wal dir " "Want to rename from " + src + " to " + dst + ", but they both do not exist");
+ peerReplayWALDir + " not exist for peer id=" + peerId);
} }
} }
public List<Path> getReplayWALs(String peerId) throws IOException { public void renameToPeerReplayWALDir(String peerId) throws IOException {
Path peerReplayWALDir = getPeerReplayWALDir(peerId); rename(getPeerRemoteWALDir(remoteWALDir, peerId), getPeerReplayWALDir(remoteWALDir, peerId),
List<Path> replayWals = new ArrayList<>(); peerId);
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(peerReplayWALDir, false);
while (iterator.hasNext()) {
replayWals.add(iterator.next().getPath());
}
return replayWals;
} }
public void removePeerReplayWALDir(String peerId) throws IOException { public void renameToPeerSnapshotWALDir(String peerId) throws IOException {
Path peerReplayWALDir = getPeerReplayWALDir(peerId); rename(getPeerReplayWALDir(remoteWALDir, peerId), getPeerSnapshotWALDir(remoteWALDir, peerId),
peerId);
}
public List<Path> getReplayWALsAndCleanUpUnusedFiles(String peerId) throws IOException {
Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
for (FileStatus status : fs.listStatus(peerReplayWALDir,
p -> p.getName().endsWith(ReplicationUtils.RENAME_WAL_SUFFIX))) {
Path src = status.getPath();
String srcName = src.getName();
String dstName =
srcName.substring(0, srcName.length() - ReplicationUtils.RENAME_WAL_SUFFIX.length());
FSUtils.renameFile(fs, src, new Path(src.getParent(), dstName));
}
List<Path> wals = new ArrayList<>();
for (FileStatus status : fs.listStatus(peerReplayWALDir)) {
Path path = status.getPath();
if (path.getName().endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
wals.add(path);
} else {
if (!fs.delete(path, true)) {
LOG.warn("Can not delete unused file: " + path);
}
}
}
return wals;
}
public void snapshotPeerReplayWALDir(String peerId) throws IOException {
Path peerReplayWALDir = getPeerReplayWALDir(remoteWALDir, peerId);
if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) { if (fs.exists(peerReplayWALDir) && !fs.delete(peerReplayWALDir, true)) {
throw new IOException( throw new IOException(
"Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId); "Failed to remove replay wals dir " + peerReplayWALDir + " for peer id=" + peerId);
} }
} }
public void removePeerRemoteWALs(String peerId) throws IOException { private void deleteDir(Path dir, String peerId) throws IOException {
Path remoteWALDir = getPeerRemoteWALDir(peerId); if (!fs.delete(dir, true) && fs.exists(dir)) {
if (fs.exists(remoteWALDir) && !fs.delete(remoteWALDir, true)) { throw new IOException("Failed to remove dir " + dir + " for peer id=" + peerId);
throw new IOException(
"Failed to remove remote WALs dir " + remoteWALDir + " for peer id=" + peerId);
} }
} }
public void removePeerRemoteWALs(String peerId) throws IOException {
deleteDir(getPeerRemoteWALDir(remoteWALDir, peerId), peerId);
deleteDir(getPeerReplayWALDir(remoteWALDir, peerId), peerId);
deleteDir(getPeerSnapshotWALDir(remoteWALDir, peerId), peerId);
}
public void initPeerWorkers(String peerId) { public void initPeerWorkers(String peerId) {
BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>(); BlockingQueue<ServerName> servers = new LinkedBlockingQueue<>();
services.getServerManager().getOnlineServers().keySet() services.getServerManager().getOnlineServers().keySet()
@ -144,4 +161,9 @@ public class ReplaySyncReplicationWALManager {
// remove the "/" too. // remove the "/" too.
return pathStr.substring(walRootDir.toString().length() + 1); return pathStr.substring(walRootDir.toString().length() + 1);
} }
@VisibleForTesting
public Path getRemoteWALDir() {
return remoteWALDir;
}
} }

View File

@ -118,7 +118,7 @@ public class TransitPeerSyncReplicationStateProcedure
env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState); env.getReplicationPeerManager().preTransitPeerSyncReplicationState(peerId, toState);
if (toState == SyncReplicationState.ACTIVE) { if (toState == SyncReplicationState.ACTIVE) {
Path remoteWALDirForPeer = Path remoteWALDirForPeer =
ReplicationUtils.getRemoteWALDirForPeer(desc.getPeerConfig().getRemoteWALDir(), peerId); ReplicationUtils.getPeerRemoteWALDir(desc.getPeerConfig().getRemoteWALDir(), peerId);
// check whether the remote wal directory is present // check whether the remote wal directory is present
if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration()) if (!remoteWALDirForPeer.getFileSystem(env.getMasterConfiguration())
.exists(remoteWALDirForPeer)) { .exists(remoteWALDirForPeer)) {
@ -152,7 +152,7 @@ public class TransitPeerSyncReplicationStateProcedure
throws ProcedureYieldException, IOException { throws ProcedureYieldException, IOException {
MasterFileSystem mfs = env.getMasterFileSystem(); MasterFileSystem mfs = env.getMasterFileSystem();
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME); Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem walFs = mfs.getWALFileSystem(); FileSystem walFs = mfs.getWALFileSystem();
if (walFs.exists(remoteWALDirForPeer)) { if (walFs.exists(remoteWALDirForPeer)) {
LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway", LOG.warn("Wal dir {} already exists, usually this should not happen, continue anyway",

View File

@ -1963,8 +1963,7 @@ public class HRegionServer extends HasThread implements
sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
if (this.csm != null) { if (this.csm != null) {
// SplitLogWorker needs csm. If none, don't start this. // SplitLogWorker needs csm. If none, don't start this.
this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory);
this, walFactory);
splitLogWorker.start(); splitLogWorker.start();
} else { } else {
LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null"); LOG.warn("SplitLogWorker Service NOT started; CoordinatedStateManager is null");

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler; import org.apache.hadoop.hbase.replication.regionserver.PeerProcedureHandler;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider; import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -37,4 +38,9 @@ public interface ReplicationSourceService extends ReplicationService {
* Returns a Handler to handle peer procedures. * Returns a Handler to handle peer procedures.
*/ */
PeerProcedureHandler getPeerProcedureHandler(); PeerProcedureHandler getPeerProcedureHandler();
/**
* Return the replication peers.
*/
ReplicationPeers getReplicationPeers();
} }

View File

@ -23,22 +23,31 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Optional;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status;
import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -67,67 +76,133 @@ public class SplitLogWorker implements Runnable {
Thread worker; Thread worker;
// thread pool which executes recovery work // thread pool which executes recovery work
private SplitLogWorkerCoordination coordination; private SplitLogWorkerCoordination coordination;
private Configuration conf;
private RegionServerServices server; private RegionServerServices server;
public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server,
TaskExecutor splitTaskExecutor) { TaskExecutor splitTaskExecutor) {
this.server = server; this.server = server;
this.conf = conf;
this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
coordination.init(server, conf, splitTaskExecutor, this); coordination.init(server, conf, splitTaskExecutor, this);
} }
public SplitLogWorker(final Server hserver, final Configuration conf, public SplitLogWorker(Configuration conf, RegionServerServices server,
final RegionServerServices server, final LastSequenceId sequenceIdChecker, LastSequenceId sequenceIdChecker, WALFactory factory) {
final WALFactory factory) { this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory));
this(hserver, conf, server, new TaskExecutor() { }
@Override
public Status exec(String filename, CancelableProgressable p) { // returns whether we need to continue the split work
Path walDir; private static boolean processSyncReplicationWAL(String name, Configuration conf,
FileSystem fs; RegionServerServices server, FileSystem fs, Path walDir) throws IOException {
try { Path walFile = new Path(walDir, name);
walDir = FSUtils.getWALRootDir(conf); String filename = walFile.getName();
fs = walDir.getFileSystem(conf); Optional<String> optSyncPeerId =
} catch (IOException e) { SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename);
LOG.warn("could not find root dir or fs", e); if (!optSyncPeerId.isPresent()) {
return Status.RESIGNED; return true;
} }
// TODO have to correctly figure out when log splitting has been String peerId = optSyncPeerId.get();
// interrupted or has encountered a transient error and when it has ReplicationPeerImpl peer =
// encountered a bad non-retry-able persistent error. server.getReplicationSourceService().getReplicationPeers().getPeer(peerId);
try { if (peer == null || !peer.getPeerConfig().isSyncReplication()) {
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), return true;
fs, conf, p, sequenceIdChecker, }
server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { Pair<SyncReplicationState, SyncReplicationState> stateAndNewState =
return Status.PREEMPTED; peer.getSyncReplicationStateAndNewState();
} if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
} catch (InterruptedIOException iioe) { stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) {
LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); // copy the file to remote and overwrite the previous one
return Status.RESIGNED; String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
} catch (IOException e) { Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
if (e instanceof FileNotFoundException) { Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp");
// A wal file may not exist anymore. Nothing can be recovered so move on FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
LOG.warn("WAL {} does not exist anymore", filename, e); try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation")
return Status.DONE; FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true,
} FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL),
Throwable cause = e.getCause(); remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) {
if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException IOUtils.copy(in, out);
|| cause instanceof ConnectException }
|| cause instanceof SocketTimeoutException)) { Path toCommitRemoteWAL =
LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX);
+ "resigning", e); // Some FileSystem implementations may not support atomic rename so we need to do it in two
return Status.RESIGNED; // phases
} else if (cause instanceof InterruptedException) { FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL);
LOG.warn("log splitting of " + filename + " interrupted, resigning", e); FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename));
return Status.RESIGNED; } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) &&
} stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) ||
LOG.warn("log splitting of " + filename + " failed, returning error", e); stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) {
return Status.ERR; // check whether we still need to process this file
} // actually we only write wal file which name is ended with .syncrep in A state, and after
// transiting to a state other than A, we will reopen all the regions so the data in the wal
// will be flushed so the wal file will be archived soon. But it is still possible that there
// is a server crash when we are transiting from A to S, to simplify the logic of the transit
// procedure, here we will also check the remote snapshot directory in state S, so that we do
// not need wait until all the wal files with .syncrep suffix to be archived before finishing
// the procedure.
String remoteWALDir = peer.getPeerConfig().getRemoteWALDir();
Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId);
FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) {
// the file has been replayed when the remote cluster was transited from S to DA, the
// content will be replicated back to us so give up split it.
LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " +
"the content will be replicated back", filename);
return false;
}
}
return true;
}
private static Status splitLog(String name, CancelableProgressable p, Configuration conf,
RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) {
Path walDir;
FileSystem fs;
try {
walDir = FSUtils.getWALRootDir(conf);
fs = walDir.getFileSystem(conf);
} catch (IOException e) {
LOG.warn("could not find root dir or fs", e);
return Status.RESIGNED;
}
try {
if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) {
return Status.DONE; return Status.DONE;
} }
}); } catch (IOException e) {
LOG.warn("failed to process sync replication wal {}", name, e);
return Status.RESIGNED;
}
// TODO have to correctly figure out when log splitting has been
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf,
p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(),
factory)) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
LOG.warn("log splitting of " + name + " interrupted, resigning", iioe);
return Status.RESIGNED;
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
// A wal file may not exist anymore. Nothing can be recovered so move on
LOG.warn("WAL {} does not exist anymore", name, e);
return Status.DONE;
}
Throwable cause = e.getCause();
if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException ||
cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
LOG.warn("log replaying of " + name + " can't connect to the target regionserver, " +
"resigning", e);
return Status.RESIGNED;
} else if (cause instanceof InterruptedException) {
LOG.warn("log splitting of " + name + " interrupted, resigning", e);
return Status.RESIGNED;
}
LOG.warn("log splitting of " + name + " failed, returning error", e);
return Status.ERR;
}
return Status.DONE;
} }
@Override @Override
@ -191,6 +266,7 @@ public class SplitLogWorker implements Runnable {
* {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in * {@link org.apache.hadoop.hbase.master.SplitLogManager} commit the work in
* SplitLogManager.TaskFinisher * SplitLogManager.TaskFinisher
*/ */
@FunctionalInterface
public interface TaskExecutor { public interface TaskExecutor {
enum Status { enum Status {
DONE(), DONE(),

View File

@ -32,13 +32,13 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
* An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances. * An {@link AsyncWriter} wrapper which writes data to a set of {@link AsyncWriter} instances.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public abstract class CombinedAsyncWriter implements AsyncWriter { public final class CombinedAsyncWriter implements AsyncWriter {
private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class); private static final Logger LOG = LoggerFactory.getLogger(CombinedAsyncWriter.class);
protected final ImmutableList<AsyncWriter> writers; private final ImmutableList<AsyncWriter> writers;
protected CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) { private CombinedAsyncWriter(ImmutableList<AsyncWriter> writers) {
this.writers = writers; this.writers = writers;
} }
@ -66,69 +66,29 @@ public abstract class CombinedAsyncWriter implements AsyncWriter {
} }
} }
protected abstract void doSync(CompletableFuture<Long> future);
@Override
public CompletableFuture<Long> sync() {
CompletableFuture<Long> future = new CompletableFuture<>();
doSync(future);
return future;
}
@Override @Override
public void append(Entry entry) { public void append(Entry entry) {
writers.forEach(w -> w.append(entry)); writers.forEach(w -> w.append(entry));
} }
public enum Mode { @Override
SEQUENTIAL, PARALLEL public CompletableFuture<Long> sync() {
CompletableFuture<Long> future = new CompletableFuture<>();
AtomicInteger remaining = new AtomicInteger(writers.size());
writers.forEach(w -> w.sync().whenComplete((length, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (remaining.decrementAndGet() == 0) {
future.complete(length);
}
}));
return future;
} }
public static CombinedAsyncWriter create(Mode mode, AsyncWriter writer, AsyncWriter... writers) { public static CombinedAsyncWriter create(AsyncWriter writer, AsyncWriter... writers) {
ImmutableList<AsyncWriter> ws = return new CombinedAsyncWriter(
ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build(); ImmutableList.<AsyncWriter> builder().add(writer).add(writers).build());
switch (mode) {
case SEQUENTIAL:
return new CombinedAsyncWriter(ws) {
private void doSync(CompletableFuture<Long> future, Long length, int index) {
if (index == writers.size()) {
future.complete(length);
return;
}
writers.get(index).sync().whenComplete((len, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
doSync(future, len, index + 1);
});
}
@Override
protected void doSync(CompletableFuture<Long> future) {
doSync(future, null, 0);
}
};
case PARALLEL:
return new CombinedAsyncWriter(ws) {
@Override
protected void doSync(CompletableFuture<Long> future) {
AtomicInteger remaining = new AtomicInteger(writers.size());
writers.forEach(w -> w.sync().whenComplete((length, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
if (remaining.decrementAndGet() == 0) {
future.complete(length);
}
}));
}
};
default:
throw new IllegalArgumentException("Unknown mode: " + mode);
}
} }
} }

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -50,6 +51,13 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
this.remoteWalDir = remoteWalDir; this.remoteWalDir = remoteWalDir;
} }
// will be overridden in testcase
@VisibleForTesting
protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
AsyncWriter remoteWriter) {
return CombinedAsyncWriter.create(remoteWriter, localWriter);
}
@Override @Override
protected AsyncWriter createWriterInstance(Path path) throws IOException { protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path); AsyncWriter localWriter = super.createWriterInstance(path);
@ -66,8 +74,7 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
closeWriter(localWriter); closeWriter(localWriter);
} }
} }
return CombinedAsyncWriter.create(CombinedAsyncWriter.Mode.SEQUENTIAL, remoteWriter, return createCombinedAsyncWriter(localWriter, remoteWriter);
localWriter);
} }
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote // Allow temporarily skipping the creation of remote writer. When failing to write to the remote

View File

@ -288,4 +288,9 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() { public SyncReplicationPeerInfoProvider getSyncReplicationPeerInfoProvider() {
return syncReplicationPeerInfoProvider; return syncReplicationPeerInfoProvider;
} }
@Override
public ReplicationPeers getReplicationPeers() {
return replicationPeers;
}
} }

View File

@ -671,7 +671,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals) private void removeRemoteWALs(String peerId, String remoteWALDir, Collection<String> wals)
throws IOException { throws IOException {
Path remoteWALDirForPeer = ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId); Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId);
FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); FileSystem fs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir);
for (String wal : wals) { for (String wal : wals) {
Path walFile = new Path(remoteWALDirForPeer, wal); Path walFile = new Path(remoteWALDirForPeer, wal);

View File

@ -77,8 +77,7 @@ class SyncReplicationPeerInfoProviderImpl implements SyncReplicationPeerInfoProv
return false; return false;
} }
Pair<SyncReplicationState, SyncReplicationState> states = Pair<SyncReplicationState, SyncReplicationState> states =
peer.getSyncReplicationStateAndNewState(); peer.getSyncReplicationStateAndNewState();
return checker.test(states.getFirst(), states.getSecond()); return checker.test(states.getFirst(), states.getSecond());
} }
} }

View File

@ -843,6 +843,15 @@ public abstract class FSUtils extends CommonFSUtils {
return frags; return frags;
} }
public static void renameFile(FileSystem fs, Path src, Path dst) throws IOException {
if (fs.exists(dst) && !fs.delete(dst, false)) {
throw new IOException("Can not delete " + dst);
}
if (!fs.rename(src, dst)) {
throw new IOException("Can not rename from " + src + " to " + dst);
}
}
/** /**
* A {@link PathFilter} that returns only regular files. * A {@link PathFilter} that returns only regular files.
*/ */

View File

@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALArchiveDir
import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName; import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.getWALDirectoryName;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -51,6 +53,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams; import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -67,8 +70,9 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class); private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
// only for injecting errors for testcase, do not use it for other purpose.
@VisibleForTesting @VisibleForTesting
public static final String LOG_SUFFIX = ".syncrep"; public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
private final WALProvider provider; private final WALProvider provider;
@ -126,12 +130,35 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
} }
private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException { private DualAsyncFSWAL createWAL(String peerId, String remoteWALDir) throws IOException {
return new DualAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), Class<? extends DualAsyncFSWAL> clazz =
ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir), conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
CommonFSUtils.getWALRootDir(conf), try {
ReplicationUtils.getRemoteWALDirForPeer(remoteWALDir, peerId), Constructor<?> constructor = null;
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), for (Constructor<?> c : clazz.getDeclaredConstructors()) {
conf, listeners, true, getLogPrefix(peerId), LOG_SUFFIX, eventLoopGroup, channelClass); if (c.getParameterCount() > 0) {
constructor = c;
break;
}
}
if (constructor == null) {
throw new IllegalArgumentException("No valid constructor provided for class " + clazz);
}
constructor.setAccessible(true);
return (DualAsyncFSWAL) constructor.newInstance(
CommonFSUtils.getWALFileSystem(conf),
ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir),
CommonFSUtils.getWALRootDir(conf),
ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId),
getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId),
conf, listeners, true, getLogPrefix(peerId), ReplicationUtils.SYNC_WAL_SUFFIX,
eventLoopGroup, channelClass);
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
Throwables.propagateIfPossible(cause, IOException.class);
throw new RuntimeException(cause);
}
} }
private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException { private DualAsyncFSWAL getWAL(String peerId, String remoteWALDir) throws IOException {
@ -304,7 +331,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
* </p> * </p>
*/ */
public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) { public static Optional<String> getSyncReplicationPeerIdFromWALName(String name) {
if (!name.endsWith(LOG_SUFFIX)) { if (!name.endsWith(ReplicationUtils.SYNC_WAL_SUFFIX)) {
// fast path to return earlier if the name is not for a sync replication peer. // fast path to return earlier if the name is not for a sync replication peer.
return Optional.empty(); return Optional.empty();
} }

View File

@ -1141,7 +1141,7 @@ public class TestReplicationAdmin {
LOG.info("Expected error:", e); LOG.info("Expected error:", e);
} }
TEST_UTIL.getTestFileSystem() TEST_UTIL.getTestFileSystem()
.mkdirs(ReplicationUtils.getRemoteWALDirForPeer(rootDir, ID_SECOND)); .mkdirs(ReplicationUtils.getPeerRemoteWALDir(rootDir, ID_SECOND));
hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE); hbaseAdmin.transitReplicationPeerSyncReplicationState(ID_SECOND, SyncReplicationState.ACTIVE);
assertEquals(SyncReplicationState.ACTIVE, assertEquals(SyncReplicationState.ACTIVE,
hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND)); hbaseAdmin.getReplicationPeerSyncReplicationState(ID_SECOND));

View File

@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -39,23 +37,18 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, MediumTests.class }) @Category({ RegionServerTests.class, MediumTests.class })
public class TestCombinedAsyncWriter { public class TestCombinedAsyncWriter {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class); HBaseClassTestRule.forClass(TestCombinedAsyncWriter.class);
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
@ -68,15 +61,6 @@ public class TestCombinedAsyncWriter {
@Rule @Rule
public final TestName name = new TestName(); public final TestName name = new TestName();
@Parameter
public CombinedAsyncWriter.Mode mode;
@Parameters(name = "{index}: mode={0}")
public static List<Object[]> params() {
return Arrays.asList(new Object[] { CombinedAsyncWriter.Mode.SEQUENTIAL },
new Object[] { CombinedAsyncWriter.Mode.PARALLEL });
}
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
EVENT_LOOP_GROUP = new NioEventLoopGroup(); EVENT_LOOP_GROUP = new NioEventLoopGroup();
@ -125,7 +109,7 @@ public class TestCombinedAsyncWriter {
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false, AsyncWriter writer2 = AsyncFSWALProvider.createAsyncWriter(conf, fs, path2, false,
EVENT_LOOP_GROUP.next(), CHANNEL_CLASS); EVENT_LOOP_GROUP.next(), CHANNEL_CLASS);
CombinedAsyncWriter writer = CombinedAsyncWriter.create(mode, writer1, writer2)) { CombinedAsyncWriter writer = CombinedAsyncWriter.create(writer1, writer2)) {
ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName, ProtobufLogTestHelper.doWrite(new WriterOverAsyncWriter(writer), withTrailer, tableName,
columnCount, recordCount, row, timestamp); columnCount, recordCount, row, timestamp);
try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) { try (ProtobufLogReader reader = (ProtobufLogReader) WALS.createReader(fs, path1)) {

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
class DualAsyncFSWALForTest extends DualAsyncFSWAL {
private boolean localBroken;
private boolean remoteBroken;
private CountDownLatch arrive;
private CountDownLatch resume;
private final class MyCombinedAsyncWriter implements AsyncWriter {
private final AsyncWriter localWriter;
private final AsyncWriter remoteWriter;
public MyCombinedAsyncWriter(AsyncWriter localWriter, AsyncWriter remoteWriter) {
this.localWriter = localWriter;
this.remoteWriter = remoteWriter;
}
@Override
public long getLength() {
return localWriter.getLength();
}
@Override
public void close() throws IOException {
Closeables.close(localWriter, true);
Closeables.close(remoteWriter, true);
}
@Override
public CompletableFuture<Long> sync() {
CompletableFuture<Long> localFuture;
CompletableFuture<Long> remoteFuture;
if (!localBroken) {
localFuture = localWriter.sync();
} else {
localFuture = new CompletableFuture<>();
localFuture.completeExceptionally(new IOException("Inject error"));
}
if (!remoteBroken) {
remoteFuture = remoteWriter.sync();
} else {
remoteFuture = new CompletableFuture<>();
remoteFuture.completeExceptionally(new IOException("Inject error"));
}
return CompletableFuture.allOf(localFuture, remoteFuture).thenApply(v -> {
return localFuture.getNow(0L);
});
}
@Override
public void append(Entry entry) {
if (!localBroken) {
localWriter.append(entry);
}
if (!remoteBroken) {
remoteWriter.append(entry);
}
}
}
public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists,
prefix, suffix, eventLoopGroup, channelClass);
}
@Override
protected AsyncWriter createCombinedAsyncWriter(AsyncWriter localWriter,
AsyncWriter remoteWriter) {
return new MyCombinedAsyncWriter(localWriter, remoteWriter);
}
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
if (arrive != null) {
arrive.countDown();
try {
resume.await();
} catch (InterruptedException e) {
}
}
if (localBroken || remoteBroken) {
throw new IOException("WAL broken");
}
return super.createWriterInstance(path);
}
public void setLocalBroken() {
this.localBroken = true;
}
public void setRemoteBroken() {
this.remoteBroken = true;
}
public void suspendLogRoll() {
arrive = new CountDownLatch(1);
resume = new CountDownLatch(1);
}
public void waitUntilArrive() throws InterruptedException {
arrive.await();
}
public void resumeLogRoll() {
resume.countDown();
}
}

View File

@ -72,9 +72,9 @@ public class SyncReplicationTestBase {
protected static String PEER_ID = "1"; protected static String PEER_ID = "1";
protected static Path remoteWALDir1; protected static Path REMOTE_WAL_DIR1;
protected static Path remoteWALDir2; protected static Path REMOTE_WAL_DIR2;
private static void initTestingUtility(HBaseTestingUtility util, String zkParent) { private static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster()); util.setZkCluster(ZK_UTIL.getZkCluster());
@ -109,22 +109,22 @@ public class SyncReplicationTestBase {
UTIL2.getAdmin().createTable(td); UTIL2.getAdmin().createTable(td);
FileSystem fs1 = UTIL1.getTestFileSystem(); FileSystem fs1 = UTIL1.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem(); FileSystem fs2 = UTIL2.getTestFileSystem();
remoteWALDir1 = REMOTE_WAL_DIR1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory()); "remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
remoteWALDir2 = REMOTE_WAL_DIR2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(), new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory()); "remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
UTIL1.getAdmin().addReplicationPeer(PEER_ID, UTIL1.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()) ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(false) .setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
.setRemoteWALDir(remoteWALDir2.toUri().toString()).build()); .setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
UTIL2.getAdmin().addReplicationPeer(PEER_ID, UTIL2.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey()) ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
.setReplicateAllUserTables(false) .setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>())) .setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
.setRemoteWALDir(remoteWALDir1.toUri().toString()).build()); .setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
} }
@AfterClass @AfterClass

View File

@ -37,8 +37,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationActive.class); HBaseClassTestRule.forClass(TestSyncReplicationActive.class);
@Test @Test
public void testActive() throws Exception { public void testActive() throws Exception {
@ -58,7 +57,7 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
verifyNotReplicatedThroughRegion(UTIL2, 0, 100); verifyNotReplicatedThroughRegion(UTIL2, 0, 100);
// Ensure that there's no cluster id in remote log entries. // Ensure that there's no cluster id in remote log entries.
verifyNoClusterIdInRemoteLog(UTIL2, remoteWALDir2, PEER_ID); verifyNoClusterIdInRemoteLog(UTIL2, REMOTE_WAL_DIR2, PEER_ID);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE); SyncReplicationState.DOWNGRADE_ACTIVE);

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationMoreLogsInLocalCopyToRemote extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalCopyToRemote.class);
@BeforeClass
public static void setUp() throws Exception {
UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
SyncReplicationTestBase.setUp();
}
@Test
public void testSplitLog() throws Exception {
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
DualAsyncFSWALForTest wal =
(DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
wal.setRemoteBroken();
try (AsyncConnection conn =
ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
try {
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0))).get();
fail("Should fail since the rs will crash and we will not retry");
} catch (ExecutionException e) {
// expected
LOG.info("Expected error:", e);
}
}
UTIL1.waitFor(60000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
return table.exists(new Get(Bytes.toBytes(0)));
}
}
@Override
public String explainFailure() throws Exception {
return "The row is still not available";
}
});
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
// We should have copied the local log to remote, so we should be able to get the value
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestSyncReplicationMoreLogsInLocalGiveUpSplitting extends SyncReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestSyncReplicationMoreLogsInLocalGiveUpSplitting.class);
@BeforeClass
public static void setUp() throws Exception {
UTIL1.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
UTIL2.getConfiguration().setClass(SyncReplicationWALProvider.DUAL_WAL_IMPL,
DualAsyncFSWALForTest.class, DualAsyncFSWAL.class);
SyncReplicationTestBase.setUp();
}
@Test
public void testSplitLog() throws Exception {
UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
UTIL2.getAdmin().disableReplicationPeer(PEER_ID);
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
table.put(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, Bytes.toBytes(0)));
}
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
DualAsyncFSWALForTest wal =
(DualAsyncFSWALForTest) rs.getWAL(RegionInfoBuilder.newBuilder(TABLE_NAME).build());
wal.setRemoteBroken();
wal.suspendLogRoll();
try (AsyncConnection conn =
ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1)
.setWriteRpcTimeout(5, TimeUnit.SECONDS).build();
try {
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1))).get();
fail("Should fail since the rs will hang and we will get a rpc timeout");
} catch (ExecutionException e) {
// expected
LOG.info("Expected error:", e);
}
}
wal.waitUntilArrive();
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
wal.resumeLogRoll();
try (Table table = UTIL2.getConnection().getTable(TABLE_NAME)) {
assertEquals(0, Bytes.toInt(table.get(new Get(Bytes.toBytes(0))).getValue(CF, CQ)));
// we failed to write this entry to remote so it should not exist
assertFalse(table.exists(new Get(Bytes.toBytes(1))));
}
UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.STANDBY);
// make sure that the region is online. We can not use waitTableAvailable since the table in
// stand by state can not be read from client.
try (Table table = UTIL1.getConnection().getTable(TABLE_NAME)) {
try {
table.exists(new Get(Bytes.toBytes(0)));
} catch (DoNotRetryIOException | RetriesExhaustedException e) {
// expected
assertThat(e.getMessage(), containsString("STANDBY"));
}
}
HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
// we give up splitting the whole wal file so this record will also be gone.
assertTrue(region.get(new Get(Bytes.toBytes(0))).isEmpty());
UTIL2.getAdmin().enableReplicationPeer(PEER_ID);
// finally it should be replicated back
waitUntilReplicationDone(UTIL1, 1);
}
}

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -66,12 +65,12 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
SyncReplicationState.ACTIVE); SyncReplicationState.ACTIVE);
MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem(); MasterFileSystem mfs = UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem();
Path remoteWALDir = ReplicationUtils.getRemoteWALDirForPeer( Path remoteWALDir = ReplicationUtils.getPeerRemoteWALDir(
new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID); new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME), PEER_ID);
FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); FileStatus[] remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length); assertEquals(1, remoteWALStatus.length);
Path remoteWAL = remoteWALStatus[0].getPath(); Path remoteWAL = remoteWALStatus[0].getPath();
assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX)); assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME); HRegionServer rs = UTIL1.getRSForFirstRegionInTable(TABLE_NAME);
@ -81,7 +80,7 @@ public class TestSyncReplicationRemoveRemoteWAL extends SyncReplicationTestBase
remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir); remoteWALStatus = mfs.getWALFileSystem().listStatus(remoteWALDir);
assertEquals(1, remoteWALStatus.length); assertEquals(1, remoteWALStatus.length);
remoteWAL = remoteWALStatus[0].getPath(); remoteWAL = remoteWALStatus[0].getPath();
assertThat(remoteWAL.getName(), endsWith(SyncReplicationWALProvider.LOG_SUFFIX)); assertThat(remoteWAL.getName(), endsWith(ReplicationUtils.SYNC_WAL_SUFFIX));
UTIL1.getAdmin().disableReplicationPeer(PEER_ID); UTIL1.getAdmin().disableReplicationPeer(PEER_ID);
write(UTIL1, 100, 200); write(UTIL1, 100, 200);

View File

@ -97,25 +97,25 @@ public class TestSyncReplicationStandBy extends SyncReplicationTestBase {
writeAndVerifyReplication(UTIL1, UTIL2, 0, 100); writeAndVerifyReplication(UTIL1, UTIL2, 0, 100);
// Remove the peers in ACTIVE & STANDBY cluster. // Remove the peers in ACTIVE & STANDBY cluster.
FileSystem fs2 = remoteWALDir2.getFileSystem(UTIL2.getConfiguration()); FileSystem fs2 = REMOTE_WAL_DIR2.getFileSystem(UTIL2.getConfiguration());
Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID, UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE); SyncReplicationState.DOWNGRADE_ACTIVE);
Assert.assertFalse(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); Assert.assertFalse(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
Assert.assertFalse(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID))); Assert.assertFalse(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)));
UTIL1.getAdmin().removeReplicationPeer(PEER_ID); UTIL1.getAdmin().removeReplicationPeer(PEER_ID);
verifyRemovedPeer(PEER_ID, remoteWALDir1, UTIL1); verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR1, UTIL1);
// Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the // Peer remoteWAL dir will be renamed to replay WAL dir when transit from S to DA, and the
// replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test // replay WAL dir will be removed after replaying all WALs, so create a emtpy dir here to test
// whether the removeReplicationPeer would remove the remoteWAL dir. // whether the removeReplicationPeer would remove the remoteWAL dir.
fs2.create(getRemoteWALDir(remoteWALDir2, PEER_ID)); fs2.create(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID));
fs2.create(getReplayRemoteWALs(remoteWALDir2, PEER_ID)); fs2.create(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID));
Assert.assertTrue(fs2.exists(getRemoteWALDir(remoteWALDir2, PEER_ID))); Assert.assertTrue(fs2.exists(getRemoteWALDir(REMOTE_WAL_DIR2, PEER_ID)));
Assert.assertTrue(fs2.exists(getReplayRemoteWALs(remoteWALDir2, PEER_ID))); Assert.assertTrue(fs2.exists(getReplayRemoteWALs(REMOTE_WAL_DIR2, PEER_ID)));
UTIL2.getAdmin().removeReplicationPeer(PEER_ID); UTIL2.getAdmin().removeReplicationPeer(PEER_ID);
verifyRemovedPeer(PEER_ID, remoteWALDir2, UTIL2); verifyRemovedPeer(PEER_ID, REMOTE_WAL_DIR2, UTIL2);
} }
} }

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -151,7 +152,8 @@ public class TestRecoverStandbyProcedure {
} }
private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException { private void setupSyncReplicationWALs() throws IOException, StreamLacksCapabilityException {
Path peerRemoteWALDir = replaySyncReplicationWALManager.getPeerRemoteWALDir(PEER_ID); Path peerRemoteWALDir = ReplicationUtils
.getPeerRemoteWALDir(replaySyncReplicationWALManager.getRemoteWALDir(), PEER_ID);
if (!fs.exists(peerRemoteWALDir)) { if (!fs.exists(peerRemoteWALDir)) {
fs.mkdirs(peerRemoteWALDir); fs.mkdirs(peerRemoteWALDir);
} }

View File

@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
@ -618,7 +617,7 @@ public abstract class TestReplicationSourceManager {
try { try {
// make sure that we can deal with files which does not exist // make sure that we can deal with files which does not exist
String walNameNotExists = String walNameNotExists =
"remoteWAL-12345-" + slaveId + ".12345" + SyncReplicationWALProvider.LOG_SUFFIX; "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX;
Path wal = new Path(logDir, walNameNotExists); Path wal = new Path(logDir, walNameNotExists);
manager.preLogRoll(wal); manager.preLogRoll(wal);
manager.postLogRoll(wal); manager.postLogRoll(wal);
@ -626,7 +625,7 @@ public abstract class TestReplicationSourceManager {
Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId);
fs.mkdirs(remoteLogDirForPeer); fs.mkdirs(remoteLogDirForPeer);
String walName = String walName =
"remoteWAL-12345-" + slaveId + ".23456" + SyncReplicationWALProvider.LOG_SUFFIX; "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX;
Path remoteWAL = Path remoteWAL =
new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory());
fs.create(remoteWAL).close(); fs.create(remoteWAL).close();

View File

@ -86,7 +86,6 @@ public class TestSyncReplicationWALProvider {
@Override @Override
public boolean checkState(TableName table, public boolean checkState(TableName table,
BiPredicate<SyncReplicationState, SyncReplicationState> checker) { BiPredicate<SyncReplicationState, SyncReplicationState> checker) {
// TODO Implement SyncReplicationPeerInfoProvider.isInState
return false; return false;
} }
} }