diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java index c38c50f7d44..68b7ebeec96 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java @@ -42,7 +42,7 @@ public class ReplicationQueueInfo { private final String peerClusterZnode; private boolean queueRecovered; // List of all the dead region servers that had this queue (if recovered) - private List deadRegionServers = new ArrayList<>(); + private List deadRegionServers = new ArrayList<>(); /** * The passed znode will be either the id of the peer cluster or @@ -66,7 +66,7 @@ public class ReplicationQueueInfo { * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-... */ private static void - extractDeadServersFromZNodeString(String deadServerListStr, List result) { + extractDeadServersFromZNodeString(String deadServerListStr, List result) { if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return; @@ -85,7 +85,7 @@ public class ReplicationQueueInfo { if (i > startIndex) { String serverName = deadServerListStr.substring(startIndex, i); if(ServerName.isFullServerName(serverName)){ - result.add(serverName); + result.add(ServerName.valueOf(serverName)); } else { LOG.error("Found invalid server name:" + serverName); } @@ -103,7 +103,7 @@ public class ReplicationQueueInfo { if(startIndex < len - 1){ String serverName = deadServerListStr.substring(startIndex, len); if(ServerName.isFullServerName(serverName)){ - result.add(serverName); + result.add(ServerName.valueOf(serverName)); } else { LOG.error("Found invalid server name at the end:" + serverName); } @@ -112,7 +112,7 @@ public class ReplicationQueueInfo { LOG.debug("Found dead servers:" + result); } - public List getDeadRegionServers() { + public List getDeadRegionServers() { return Collections.unmodifiableList(this.deadRegionServers); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index eac064299c6..b8ca1ecfc2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -141,7 +141,12 @@ public abstract class CleanerChore extends Schedu } } + private void preRunCleaner() { + cleanersChain.forEach(FileCleanerDelegate::preClean); + } + public Boolean runCleaner() { + preRunCleaner(); try { FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir); checkAndDeleteEntries(files); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java index fdeb4045824..9c611f5d42b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java @@ -44,4 +44,10 @@ public interface FileCleanerDelegate extends Configurable, Stoppable { * this method is used to pass some instance into subclass * */ void init(Map params); -} + + /** + * Used to do some initialize work before every period clean + */ + default void preClean() { + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 649e4503f7a..cc9601b5b28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; @@ -58,7 +59,6 @@ import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import com.lmax.disruptor.BlockingWaitStrategy; @@ -797,7 +797,11 @@ public class FSHLog extends AbstractFSWAL { } final Path baseDir = FSUtils.getWALRootDir(conf); - final Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path archiveDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME); + if (conf.getBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, + AbstractFSWALProvider.DEFAULT_SEPARATE_OLDLOGDIR)) { + archiveDir = new Path(archiveDir, p.getName()); + } WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } @@ -1141,10 +1145,10 @@ public class FSHLog extends AbstractFSWAL { System.err.println("Arguments:"); System.err.println(" --dump Dump textual representation of passed one or more files"); System.err.println(" For example: " - + "FSHLog --dump hdfs://example.com:9000/hbase/.logs/MACHINE/LOGFILE"); + + "FSHLog --dump hdfs://example.com:9000/hbase/WALs/MACHINE/LOGFILE"); System.err.println(" --split Split the passed directory of WAL logs"); System.err.println( - " For example: " + "FSHLog --split hdfs://example.com:9000/hbase/.logs/DIR"); + " For example: " + "FSHLog --split hdfs://example.com:9000/hbase/WALs/DIR"); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 5d6e1ef8c24..3dcb332b1f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -26,22 +26,19 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; + import java.io.IOException; import java.util.Collections; -import java.util.List; import java.util.Set; import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.zookeeper.KeeperException; /** @@ -54,23 +51,31 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { private ZooKeeperWatcher zkw; private ReplicationQueuesClient replicationQueues; private boolean stopped = false; - + private Set wals; + private long readZKTimestamp = 0; @Override - public Iterable getDeletableFiles(Iterable files) { - // all members of this class are null if replication is disabled, - // so we cannot filter the files - if (this.getConf() == null) { - return files; - } - - final Set wals; + public void preClean() { + readZKTimestamp = EnvironmentEdgeManager.currentTime(); try { // The concurrently created new WALs may not be included in the return list, // but they won't be deleted because they're not in the checking set. wals = replicationQueues.getAllWALs(); } catch (KeeperException e) { LOG.warn("Failed to read zookeeper, skipping checking deletable files"); + wals = null; + } + } + + @Override + public Iterable getDeletableFiles(Iterable files) { + // all members of this class are null if replication is disabled, + // so we cannot filter the files + if (this.getConf() == null) { + return files; + } + + if (wals == null) { return Collections.emptyList(); } return Iterables.filter(files, new Predicate() { @@ -85,8 +90,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { LOG.debug("Didn't find this log in ZK, deleting: " + wal); } } - return !logInReplicationQueue; - }}); + return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); + } + }); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 45b40c52e56..9d38026592d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; @@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.KeeperException; - import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.AtomicLongMap; /** @@ -356,7 +356,7 @@ public class DumpReplicationQueues extends Configured implements Tool { StringBuilder sb = new StringBuilder(); - List deadServers ; + List deadServers; sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); sb.append(" Queue znode: " + queueId + "\n"); @@ -385,6 +385,7 @@ public class DumpReplicationQueues extends Configured implements Tool { } return sb.toString(); } + /** * return total size in bytes from a list of WALs */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index 248a52a3093..cabf85a7f83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; @@ -51,10 +52,10 @@ public class RecoveredReplicationSource extends ReplicationSource { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, + ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - super.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerClusterZnode, + super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode, clusterId, replicationEndpoint, walFileLengthProvider, metrics); this.actualPeerId = this.replicationQueueInfo.getPeerId(); } @@ -98,7 +99,7 @@ public class RecoveredReplicationSource extends ReplicationSource { } // Path changed - try to find the right path. hasPathChanged = true; - if (stopper instanceof ReplicationSyncUp.DummyServer) { + if (server instanceof ReplicationSyncUp.DummyServer) { // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists Path newPath = getReplSyncUpPath(path); @@ -107,12 +108,13 @@ public class RecoveredReplicationSource extends ReplicationSource { } else { // See if Path exists in the dead RS folder (there could be a chain of failures // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); LOG.info("NB dead servers : " + deadRegionServers.size()); final Path walDir = FSUtils.getWALRootDir(conf); - for (String curDeadServerName : deadRegionServers) { + for (ServerName curDeadServerName : deadRegionServers) { final Path deadRsDirectory = - new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName)); + new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName + .getServerName())); Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; for (Path possibleLogLocation : locs) { @@ -189,4 +191,9 @@ public class RecoveredReplicationSource extends ReplicationSource { public String getPeerId() { return this.actualPeerId; } + + @Override + public ServerName getServerWALsBelongTo() { + return this.replicationQueueInfo.getDeadRegionServers().get(0); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index d16a68fa2f6..ea6c6d44aff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -40,6 +40,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; @@ -59,7 +61,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; @@ -94,7 +95,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // The manager of all sources to which we ping back our progress protected ReplicationSourceManager manager; // Should we stop everything? - protected Stoppable stopper; + protected Server server; // How long should we sleep for each retry private long sleepForRetries; protected FileSystem fs; @@ -139,7 +140,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf * @param conf configuration to use * @param fs file system to use * @param manager replication manager to ping to - * @param stopper the atomic boolean to use to stop the regionserver + * @param server the server for this region server * @param peerClusterZnode the name of our znode * @param clusterId unique UUID for the cluster * @param replicationEndpoint the replication endpoint implementation @@ -148,10 +149,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf */ @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, + ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { - this.stopper = stopper; + this.server = server; this.conf = HBaseConfiguration.create(conf); this.waitOnEndpointSeconds = this.conf.getInt(WAIT_ON_ENDPOINT_SECONDS, DEFAULT_WAIT_ON_ENDPOINT_SECONDS); @@ -330,7 +331,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public void uncaughtException(final Thread t, final Throwable e) { RSRpcServices.exitIfOOME(e); LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); - stopper.stop("Unexpected exception in " + t.getName()); + server.stop("Unexpected exception in " + t.getName()); } }; } @@ -500,7 +501,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf @Override public boolean isSourceActive() { - return !this.stopper.isStopped() && this.sourceRunning; + return !this.server.isStopped() && this.sourceRunning; } /** @@ -564,4 +565,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf public WALFileLengthProvider getWALFileLengthProvider() { return walFileLengthProvider; } + + @Override + public ServerName getServerWALsBelongTo() { + return server.getServerName(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java index 066f7996c4d..b6cf54de854 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java @@ -26,7 +26,8 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationException; @@ -48,13 +49,13 @@ public interface ReplicationSourceInterface { * @param manager the manager to use * @param replicationQueues * @param replicationPeers - * @param stopper the stopper object for this region server + * @param server the server for this region server * @param peerClusterZnode * @param clusterId * @throws IOException */ void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, + ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException; @@ -163,4 +164,11 @@ public interface ReplicationSourceInterface { * @param batchSize entries size pushed */ void postShipEdits(List entries, int batchSize); + + /** + * The queue of WALs only belong to one region server. This will return the server name which all + * WALs belong to. + * @return the server name which all WALs belong to + */ + ServerName getServerWALsBelongTo(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index bb993c657dc..bbcaaa4d73b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -127,8 +127,10 @@ public class ReplicationSourceWALReader extends Thread { public void run() { int sleepMultiplier = 1; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream - try (WALEntryStream entryStream = new WALEntryStream(logQueue, fs, conf, currentPosition, - source.getWALFileLengthProvider(), source.getSourceMetrics())) { + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, fs, conf, currentPosition, + source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics())) { while (isReaderRunning()) { // loop here to keep reusing stream while we can if (!checkQuota()) { continue; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java index 3be4ca46faa..6277d24c5bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java @@ -32,12 +32,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALFactory; @@ -63,6 +65,8 @@ class WALEntryStream implements Closeable { private final FileSystem fs; private final Configuration conf; private final WALFileLengthProvider walFileLengthProvider; + // which region server the WALs belong to + private final ServerName serverName; private final MetricsSource metrics; /** @@ -71,17 +75,19 @@ class WALEntryStream implements Closeable { * @param fs {@link FileSystem} to use to create {@link Reader} for this stream * @param conf {@link Configuration} to use to create {@link Reader} for this stream * @param startPosition the position in the first WAL to start reading at + * @param serverName the server name which all WALs belong to * @param metrics replication metrics * @throws IOException */ public WALEntryStream(PriorityBlockingQueue logQueue, FileSystem fs, Configuration conf, - long startPosition, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) - throws IOException { + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics) throws IOException { this.logQueue = logQueue; this.fs = fs; this.conf = conf; this.currentPosition = startPosition; this.walFileLengthProvider = walFileLengthProvider; + this.serverName = serverName; this.metrics = metrics; } @@ -296,15 +302,27 @@ class WALEntryStream implements Closeable { private Path getArchivedLog(Path path) throws IOException { Path rootDir = FSUtils.getRootDir(conf); + + // Try found the log in old dir Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); Path archivedLogLocation = new Path(oldLogDir, path.getName()); if (fs.exists(archivedLogLocation)) { LOG.info("Log " + path + " was moved to " + archivedLogLocation); return archivedLogLocation; - } else { - LOG.error("Couldn't locate log: " + path); - return path; } + + // Try found the log in the seperate old log dir + oldLogDir = + new Path(rootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME) + .append(Path.SEPARATOR).append(serverName.getServerName()).toString()); + archivedLogLocation = new Path(oldLogDir, path.getName()); + if (fs.exists(archivedLogLocation)) { + LOG.info("Log " + path + " was moved to " + archivedLogLocation); + return archivedLogLocation; + } + + LOG.error("Couldn't locate log: " + path); + return path; } private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException { @@ -316,6 +334,7 @@ class WALEntryStream implements Closeable { throw fnfe; } } + private void openReader(Path path) throws IOException { try { // Detect if this is a new file, if so get a new reader else diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 1a81b17d716..aba13c658fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.LeaseNotRecoveredException; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; /** @@ -59,6 +58,10 @@ public abstract class AbstractFSWALProvider> implemen private static final Log LOG = LogFactory.getLog(AbstractFSWALProvider.class); + /** Separate old log into different dir by regionserver name **/ + public static final String SEPARATE_OLDLOGDIR = "hbase.separate.oldlogdir.by.regionserver"; + public static final boolean DEFAULT_SEPARATE_OLDLOGDIR = false; + // Only public so classes back in regionserver.wal can access public interface Reader extends WAL.Reader { /** @@ -272,6 +275,23 @@ public abstract class AbstractFSWALProvider> implemen return dirName.toString(); } + /** + * Construct the directory name for all old WALs on a given server. The default old WALs dir + * looks like: hbase/oldWALs. If you config hbase.separate.oldlogdir.by.regionserver + * to true, it looks like hbase//oldWALs/kalashnikov.att.net,61634,1486865297088. + * @param conf + * @param serverName Server name formatted as described in {@link ServerName} + * @return the relative WAL directory name + */ + public static String getWALArchiveDirectoryName(Configuration conf, final String serverName) { + StringBuilder dirName = new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME); + if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { + dirName.append(Path.SEPARATOR); + dirName.append(serverName); + } + return dirName.toString(); + } + /** * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts, * this method ignores the format of the logfile component. Current format: [base directory for @@ -387,6 +407,14 @@ public abstract class AbstractFSWALProvider> implemen public static Path getArchivedLogPath(Path path, Configuration conf) throws IOException { Path rootDir = FSUtils.getRootDir(conf); Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + if (conf.getBoolean(SEPARATE_OLDLOGDIR, DEFAULT_SEPARATE_OLDLOGDIR)) { + ServerName serverName = getServerNameFromWALDirectoryName(path); + if (serverName == null) { + LOG.error("Couldn't locate log: " + path); + return path; + } + oldLogDir = new Path(oldLogDir, serverName.getServerName()); + } Path archivedLogLocation = new Path(oldLogDir, path.getName()); final FileSystem fs = FSUtils.getCurrentFileSystem(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 921b08ff3a1..8880ca51c4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -61,8 +61,9 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider { @Override protected AsyncFSWAL createWAL() throws IOException { return new AsyncFSWAL(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), - getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, - true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, + getWALDirectoryName(factory.factoryId), + getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup.next(), channelClass); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index c8a285fcc76..459485c69a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -76,8 +76,9 @@ public class FSHLogProvider extends AbstractFSWALProvider { @Override protected FSHLog createWAL() throws IOException { return new FSHLog(FSUtils.getWALFileSystem(conf), FSUtils.getWALRootDir(conf), - getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, - true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); + getWALDirectoryName(factory.factoryId), + getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index f7ee37df5b8..547f72e9bcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -225,6 +225,7 @@ public class TestLogsCleaner { "testZooKeeperAbort-faulty", null)) { faultyZK.init(); cleaner.setConf(conf, faultyZK); + cleaner.preClean(); // should keep all files due to a ConnectionLossException getting the queues znodes Iterable toDelete = cleaner.getDeletableFiles(dummyFiles); assertFalse(toDelete.iterator().hasNext()); @@ -235,6 +236,7 @@ public class TestLogsCleaner { cleaner = new ReplicationLogCleaner(); try (ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null)) { cleaner.setConf(conf, zkw); + cleaner.preClean(); Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); Iterator iter = filesToDelete.iterator(); assertTrue(iter.hasNext()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java index bfe17b5a021..a12cebd15cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java @@ -25,7 +25,8 @@ import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.regionserver.MetricsSource; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; @@ -47,7 +48,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, + ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { this.manager = manager; @@ -142,4 +143,9 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface { public WALFileLengthProvider getWALFileLengthProvider() { return walFileLengthProvider; } + + @Override + public ServerName getServerWALsBelongTo() { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 206b5003649..58b97b9c5b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.Arrays; +import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; @@ -36,10 +38,13 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; /** * This class is only a base for other integration-level replication tests. @@ -82,6 +87,14 @@ public class TestReplicationBase { protected static final byte[] row = Bytes.toBytes("row"); protected static final byte[] noRepfamName = Bytes.toBytes("norep"); + @Parameter + public static boolean seperateOldWALs; + + @Parameters + public static List params() { + return Arrays.asList(false, true); + } + /** * @throws java.lang.Exception */ @@ -106,6 +119,9 @@ public class TestReplicationBase { conf1.setFloat("replication.source.ratio", 1.0f); conf1.setBoolean("replication.source.eof.autorecovery", true); + // Parameter config + conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, seperateOldWALs); + utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); MiniZooKeeperCluster miniZK = utility1.getZkCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java index 51a39a6f0cd..5487c048576 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java @@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Runs the TestReplicationKillRS test and selects the RS to kill in the master cluster * Do not add other tests in this class. */ +@RunWith(Parameterized.class) @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationKillMasterRS extends TestReplicationKillRS { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java index 07e18b2b3e9..6a824d0f01c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java @@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Runs the TestReplicationKillRS test and selects the RS to kill in the slave cluster * Do not add other tests in this class. */ +@RunWith(Parameterized.class) @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationKillSlaveRS extends TestReplicationKillRS { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index 3934e05e972..5712146e4f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -645,7 +645,7 @@ public abstract class TestReplicationSourceManager { @Override public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, - ReplicationQueues rq, ReplicationPeers rp, Stoppable stopper, String peerClusterId, + ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId, UUID clusterId, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { throw new IOException("Failing deliberately"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java index 945d9f4e8ca..b47a8d3f628 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; @@ -105,11 +106,11 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst(); rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0)); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3); - List result = replicationQueueInfo.getDeadRegionServers(); + List result = replicationQueueInfo.getDeadRegionServers(); // verify - assertTrue(result.contains(server.getServerName().getServerName())); - assertTrue(result.contains(s1.getServerName().getServerName())); - assertTrue(result.contains(s2.getServerName().getServerName())); + assertTrue(result.contains(server.getServerName())); + assertTrue(result.contains(s1.getServerName())); + assertTrue(result.contains(s2.getServerName())); server.stop(""); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index d65054c505a..28ee1018c12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -148,7 +148,7 @@ public class TestWALEntryStream { log.rollWriter(); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { int i = 0; while (entryStream.hasNext()) { assertNotNull(entryStream.next()); @@ -175,7 +175,7 @@ public class TestWALEntryStream { appendToLog(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.next(); @@ -193,7 +193,7 @@ public class TestWALEntryStream { appendToLog(); try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, - log, new MetricsSource("1"))) { + log, null, new MetricsSource("1"))) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -207,7 +207,7 @@ public class TestWALEntryStream { appendToLog(); try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos, - log, new MetricsSource("1"))) { + log, null, new MetricsSource("1"))) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -232,7 +232,7 @@ public class TestWALEntryStream { appendToLog("1"); appendToLog("2");// 2 try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -257,7 +257,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -280,7 +280,7 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); @@ -288,7 +288,7 @@ public class TestWALEntryStream { } // next stream should picks up where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -306,13 +306,13 @@ public class TestWALEntryStream { appendEntriesToLog(3); // read only one element try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition, - log, new MetricsSource("1"))) { + log, null, new MetricsSource("1"))) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, lastPosition, log, null, new MetricsSource("1"))) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); @@ -323,7 +323,7 @@ public class TestWALEntryStream { @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { assertFalse(entryStream.hasNext()); } } @@ -334,7 +334,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) { + new WALEntryStream(walQueue, fs, conf, 0, log, null, new MetricsSource("1"))) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -440,7 +440,7 @@ public class TestWALEntryStream { long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0, - p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) { + p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); // can not get log 2