HBASE-14247 Separate the old WALs into different regionserver directories

This commit is contained in:
Guanghao Zhang 2017-05-18 17:34:00 +08:00
parent 9a27ac8781
commit 6db1ce1e91
22 changed files with 202 additions and 77 deletions

View File

@ -42,7 +42,7 @@ public class ReplicationQueueInfo {
private final String peerClusterZnode;
private boolean queueRecovered;
// List of all the dead region servers that had this queue (if recovered)
private List<String> deadRegionServers = new ArrayList<>();
private List<ServerName> deadRegionServers = new ArrayList<>();
/**
* The passed znode will be either the id of the peer cluster or
@ -66,7 +66,7 @@ public class ReplicationQueueInfo {
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-&lt;server name>-...
*/
private static void
extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
extractDeadServersFromZNodeString(String deadServerListStr, List<ServerName> result) {
if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
@ -85,7 +85,7 @@ public class ReplicationQueueInfo {
if (i > startIndex) {
String serverName = deadServerListStr.substring(startIndex, i);
if(ServerName.isFullServerName(serverName)){
result.add(serverName);
result.add(ServerName.valueOf(serverName));
} else {
LOG.error("Found invalid server name:" + serverName);
}
@ -103,7 +103,7 @@ public class ReplicationQueueInfo {
if(startIndex < len - 1){
String serverName = deadServerListStr.substring(startIndex, len);
if(ServerName.isFullServerName(serverName)){
result.add(serverName);
result.add(ServerName.valueOf(serverName));
} else {
LOG.error("Found invalid server name at the end:" + serverName);
}
@ -112,7 +112,7 @@ public class ReplicationQueueInfo {
LOG.debug("Found dead servers:" + result);
}
public List<String> getDeadRegionServers() {
public List<ServerName> getDeadRegionServers() {
return Collections.unmodifiableList(this.deadRegionServers);
}

View File

@ -141,7 +141,12 @@ public abstract class CleanerChore<T extends FileCleanerDelegate> extends Schedu
}
}
private void preRunCleaner() {
cleanersChain.forEach(FileCleanerDelegate::preClean);
}
public Boolean runCleaner() {
preRunCleaner();
try {
FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir);
checkAndDeleteEntries(files);

View File

@ -44,4 +44,10 @@ public interface FileCleanerDelegate extends Configurable, Stoppable {
* this method is used to pass some instance into subclass
* */
void init(Map<String, Object> params);
/**
* Used to do some initialize work before every period clean
*/
default void preClean() {
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -58,7 +59,6 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.BlockingWaitStrategy;
@ -797,7 +797,11 @@ public class FSHLog extends AbstractFSWAL<Writer> {
}
final Path baseDir = FSUtils.getWALRootDir(conf);
final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR,
AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) {
archiveDir = new Path(archiveDir, p.getName());
}
WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf));
}
@ -1141,10 +1145,10 @@ public class FSHLog extends AbstractFSWAL<Writer> {
System.err.println("Arguments:");
System.err.println(" --dump Dump textual representation of passed one or more files");
System.err.println(" For example: "
+ "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE");
+ "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE");
System.err.println(" --split Split the passed directory of WAL logs");
System.err.println(
" For example: " + "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR");
" For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR");
}
/**

View File

@ -26,22 +26,19 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.zookeeper.KeeperException;
/**
@ -54,23 +51,31 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private ZooKeeperWatcher zkw;
private ReplicationQueuesClient replicationQueues;
private boolean stopped = false;
private Set<String> wals;
private long readZKTimestamp = 0;
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}
final Set<String> wals;
public void preClean() {
readZKTimestamp = EnvironmentEdgeManager.currentTime();
try {
// The concurrently created new WALs may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
wals = replicationQueues.getAllWALs();
} catch (KeeperException e) {
LOG.warn("Failed to read zookeeper, skipping checking deletable files");
wals = null;
}
}
@Override
public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
// all members of this class are null if replication is disabled,
// so we cannot filter the files
if (this.getConf() == null) {
return files;
}
if (wals == null) {
return Collections.emptyList();
}
return Iterables.filter(files, new Predicate<FileStatus>() {
@ -85,8 +90,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
LOG.debug("Didn't find this log in ZK, deleting: " + wal);
}
}
return !logInReplicationQueue;
}});
return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp);
}
});
}
@Override

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AtomicLongMap;
/**
@ -356,7 +356,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
StringBuilder sb = new StringBuilder();
List<String> deadServers ;
List<ServerName> deadServers;
sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
sb.append(" Queue znode: " + queueId + "\n");
@ -385,6 +385,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
return sb.toString();
}
/**
* return total size in bytes from a list of WALs
*/

View File

@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
@ -51,10 +52,10 @@ public class RecoveredReplicationSource extends ReplicationSource {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode,
super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
clusterId, replicationEndpoint, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@ -98,7 +99,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
// Path changed - try to find the right path.
hasPathChanged = true;
if (stopper instanceof ReplicationSyncUp.DummyServer) {
if (server instanceof ReplicationSyncUp.DummyServer) {
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
Path newPath = getReplSyncUpPath(path);
@ -107,12 +108,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
} else {
// See if Path exists in the dead RS folder (there could be a chain of failures
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path walDir = FSUtils.getWALRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
for (ServerName curDeadServerName : deadRegionServers) {
final Path deadRsDirectory =
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName));
new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
.getServerName()));
Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
@ -189,4 +191,9 @@ public class RecoveredReplicationSource extends ReplicationSource {
public String getPeerId() {
return this.actualPeerId;
}
@Override
public ServerName getServerWALsBelongTo() {
return this.replicationQueueInfo.getDeadRegionServers().get(0);
}
}

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
@ -59,7 +61,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@ -94,7 +95,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
// The manager of all sources to which we ping back our progress
protected ReplicationSourceManager manager;
// Should we stop everything?
protected Stoppable stopper;
protected Server server;
// How long should we sleep for each retry
private long sleepForRetries;
protected FileSystem fs;
@ -139,7 +140,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param conf configuration to use
* @param fs file system to use
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param server the server for this region server
* @param peerClusterZnode the name of our znode
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
@ -148,10 +149,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.stopper = stopper;
this.server = server;
this.conf = HBaseConfiguration.create(conf);
this.waitOnEndpointSeconds =
this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS);
@ -330,7 +331,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
stopper.stop("Unexpected exception in " + t.getName());
server.stop("Unexpected exception in " + t.getName());
}
};
}
@ -500,7 +501,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
return !this.server.isStopped() && this.sourceRunning;
}
/**
@ -564,4 +565,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public WALFileLengthProvider getWALFileLengthProvider() {
return walFileLengthProvider;
}
@Override
public ServerName getServerWALsBelongTo() {
return server.getServerName();
}
}

View File

@ -26,7 +26,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
@ -48,13 +49,13 @@ public interface ReplicationSourceInterface {
* @param manager the manager to use
* @param replicationQueues
* @param replicationPeers
* @param stopper the stopper object for this region server
* @param server the server for this region server
* @param peerClusterZnode
* @param clusterId
* @throws IOException
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
@ -163,4 +164,11 @@ public interface ReplicationSourceInterface {
* @param batchSize entries size pushed
*/
void postShipEdits(List<Entry> entries, int batchSize);
/**
* The queue of WALs only belong to one region server. This will return the server name which all
* WALs belong to.
* @return the server name which all WALs belong to
*/
ServerName getServerWALsBelongTo();
}

View File

@ -127,8 +127,10 @@ public class ReplicationSourceWALReader extends Thread {
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition,
source.getWALFileLengthProvider(), source.getSourceMetrics())) {
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!checkQuota()) {
continue;

View File

@ -32,12 +32,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
@ -63,6 +65,8 @@ class WALEntryStream implements Closeable {
private final FileSystem fs;
private final Configuration conf;
private final WALFileLengthProvider walFileLengthProvider;
// which region server the WALs belong to
private final ServerName serverName;
private final MetricsSource metrics;
/**
@ -71,17 +75,19 @@ class WALEntryStream implements Closeable {
* @param fs {@link FileSystem} to use to create {@link Reader} for this stream
* @param conf {@link Configuration} to use to create {@link Reader} for this stream
* @param startPosition the position in the first WAL to start reading at
* @param serverName the server name which all WALs belong to
* @param metrics replication metrics
* @throws IOException
*/
public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics) throws IOException {
this.logQueue = logQueue;
this.fs = fs;
this.conf = conf;
this.currentPosition = startPosition;
this.walFileLengthProvider = walFileLengthProvider;
this.serverName = serverName;
this.metrics = metrics;
}
@ -296,15 +302,27 @@ class WALEntryStream implements Closeable {
private Path getArchivedLog(Path path) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
// Try found the log in old dir
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
Path archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
} else {
LOG.error("Couldn't locate log: " + path);
return path;
}
// Try found the log in the seperate old log dir
oldLogDir =
new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
archivedLogLocation = new Path(oldLogDir, path.getName());
if (fs.exists(archivedLogLocation)) {
LOG.info("Log " + path + " was moved to " + archivedLogLocation);
return archivedLogLocation;
}
LOG.error("Couldn't locate log: " + path);
return path;
}
private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
@ -316,6 +334,7 @@ class WALEntryStream implements Closeable {
throw fnfe;
}
}
private void openReader(Path path) throws IOException {
try {
// Detect if this is a new file, if so get a new reader else

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
@ -59,6 +58,10 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
private static final Log LOG = LogFactory.getLog(AbstractFSWALProvider.class);
/** Separate old log into different dir by regionserver name **/
public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver";
public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false;
// Only public so classes back in regionserver.wal can access
public interface Reader extends WAL.Reader {
/**
@ -272,6 +275,23 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
return dirName.toString();
}
/**
* Construct the directory name for all old WALs on a given server. The default old WALs dir
* looks like: <code>hbase/oldWALs</code>. If you config hbase.separate.oldlogdir.by.regionserver
* to true, it looks like <code>hbase//oldWALs/kalashnikov.att.net,61634,1486865297088</code>.
* @param conf
* @param serverName Server name formatted as described in {@link ServerName}
* @return the relative WAL directory name
*/
public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) {
StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
dirName.append(Path.SEPARATOR);
dirName.append(serverName);
}
return dirName.toString();
}
/**
* Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts,
* this method ignores the format of the logfile component. Current format: [base directory for
@ -387,6 +407,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException {
Path rootDir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) {
ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) {
LOG.error("Couldn't locate log: " + path);
return path;
}
oldLogDir = new Path(oldLogDir, serverName.getServerName());
}
Path archivedLogLocation = new Path(oldLogDir, path.getName());
final FileSystem fs = FSUtils.getCurrentFileSystem(conf);

View File

@ -61,8 +61,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> {
@Override
protected AsyncFSWAL createWAL() throws IOException {
return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
eventLoopGroup.next(), channelClass);
}

View File

@ -76,8 +76,9 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
@Override
protected FSHLog createWAL() throws IOException {
return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf),
getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
getWALDirectoryName(factory.factoryId),
getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix,
META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null);
}
@Override

View File

@ -225,6 +225,7 @@ public class TestLogsCleaner {
"testZooKeeperAbort-faulty", null)) {
faultyZK.init();
cleaner.setConf(conf, faultyZK);
cleaner.preClean();
// should keep all files due to a ConnectionLossException getting the queues znodes
Iterable<FileStatus> toDelete = cleaner.getDeletableFiles(dummyFiles);
assertFalse(toDelete.iterator().hasNext());
@ -235,6 +236,7 @@ public class TestLogsCleaner {
cleaner = new ReplicationLogCleaner();
try (ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null)) {
cleaner.setConf(conf, zkw);
cleaner.preClean();
Iterable<FileStatus> filesToDelete = cleaner.getDeletableFiles(dummyFiles);
Iterator<FileStatus> iter = filesToDelete.iterator();
assertTrue(iter.hasNext());

View File

@ -25,7 +25,8 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
@ -47,7 +48,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.manager = manager;
@ -142,4 +143,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
public WALFileLengthProvider getWALFileLengthProvider() {
return walFileLengthProvider;
}
@Override
public ServerName getServerWALsBelongTo() {
return null;
}
}

View File

@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.replication;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
@ -36,10 +38,13 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/**
* This class is only a base for other integration-level replication tests.
@ -82,6 +87,14 @@ public class TestReplicationBase {
protected static final byte[] row = Bytes.toBytes("row");
protected static final byte[] noRepfamName = Bytes.toBytes("norep");
@Parameter
public static boolean seperateOldWALs;
@Parameters
public static List<Boolean> params() {
return Arrays.asList(false, true);
}
/**
* @throws java.lang.Exception
*/
@ -106,6 +119,9 @@ public class TestReplicationBase {
conf1.setFloat("replication.source.ratio", 1.0f);
conf1.setBoolean("replication.source.eof.autorecovery", true);
// Parameter config
conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs);
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();

View File

@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Runs the TestReplicationKillRS test and selects the RS to kill in the master cluster
* Do not add other tests in this class.
*/
@RunWith(Parameterized.class)
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationKillMasterRS extends TestReplicationKillRS {

View File

@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Runs the TestReplicationKillRS test and selects the RS to kill in the slave cluster
* Do not add other tests in this class.
*/
@RunWith(Parameterized.class)
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationKillSlaveRS extends TestReplicationKillRS {

View File

@ -645,7 +645,7 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
throw new IOException("Failing deliberately");

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@ -105,11 +106,11 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
List<String> result = replicationQueueInfo.getDeadRegionServers();
List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
// verify
assertTrue(result.contains(server.getServerName().getServerName()));
assertTrue(result.contains(s1.getServerName().getServerName()));
assertTrue(result.contains(s2.getServerName().getServerName()));
assertTrue(result.contains(server.getServerName()));
assertTrue(result.contains(s1.getServerName()));
assertTrue(result.contains(s2.getServerName()));
server.stop("");
}

View File

@ -148,7 +148,7 @@ public class TestWALEntryStream {
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
@ -175,7 +175,7 @@ public class TestWALEntryStream {
appendToLog();
long oldPos;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next();
@ -193,7 +193,7 @@ public class TestWALEntryStream {
appendToLog();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
log, new MetricsSource("1"))) {
log, null, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@ -207,7 +207,7 @@ public class TestWALEntryStream {
appendToLog();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
log, new MetricsSource("1"))) {
log, null, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
@ -232,7 +232,7 @@ public class TestWALEntryStream {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@ -257,7 +257,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@ -280,7 +280,7 @@ public class TestWALEntryStream {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@ -288,7 +288,7 @@ public class TestWALEntryStream {
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
@ -306,13 +306,13 @@ public class TestWALEntryStream {
appendEntriesToLog(3);
// read only one element
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
log, new MetricsSource("1"))) {
log, null, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@ -323,7 +323,7 @@ public class TestWALEntryStream {
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
@ -334,7 +334,7 @@ public class TestWALEntryStream {
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
@ -440,7 +440,7 @@ public class TestWALEntryStream {
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
// can not get log 2