HBASE-6435 Reading WAL files after a recovery leads to time lost in HDFS timeouts when using dead datanodes
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1375451 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cfbad5ddcc
commit
f1b53f6631
|
@ -301,6 +301,16 @@ public class ServerName implements Comparable<ServerName> {
|
|||
new ServerName(str, NON_STARTCODE);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return true if the String follows the pattern of {@link ServerName#toString()}, false
|
||||
* otherwise.
|
||||
*/
|
||||
public static boolean isFullServerName(final String str){
|
||||
if (str == null ||str.isEmpty()) return false;
|
||||
return SERVERNAME_PATTERN.matcher(str).matches();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a ServerName from the passed in data bytes.
|
||||
* @param data Data with a serialize server name in it; can handle the old style
|
||||
|
|
|
@ -21,16 +21,29 @@
|
|||
package org.apache.hadoop.hbase.fs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FilterFileSystem;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.util.Methods;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
|
@ -42,6 +55,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
* this is the place to make it happen.
|
||||
*/
|
||||
public class HFileSystem extends FilterFileSystem {
|
||||
public static final Log LOG = LogFactory.getLog(HFileSystem.class);
|
||||
|
||||
private final FileSystem noChecksumFs; // read hfile data from storage
|
||||
private final boolean useHBaseChecksum;
|
||||
|
@ -78,6 +92,7 @@ public class HFileSystem extends FilterFileSystem {
|
|||
} else {
|
||||
this.noChecksumFs = fs;
|
||||
}
|
||||
addLocationsOrderInterceptor(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -159,9 +174,154 @@ public class HFileSystem extends FilterFileSystem {
|
|||
if (fs == null) {
|
||||
throw new IOException("No FileSystem for scheme: " + uri.getScheme());
|
||||
}
|
||||
|
||||
return fs;
|
||||
}
|
||||
|
||||
public static boolean addLocationsOrderInterceptor(Configuration conf) throws IOException {
|
||||
return addLocationsOrderInterceptor(conf, new ReorderWALBlocks());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an interceptor on the calls to the namenode#getBlockLocations from the DFSClient
|
||||
* linked to this FileSystem. See HBASE-6435 for the background.
|
||||
* <p/>
|
||||
* There should be no reason, except testing, to create a specific ReorderBlocks.
|
||||
*
|
||||
* @return true if the interceptor was added, false otherwise.
|
||||
*/
|
||||
static boolean addLocationsOrderInterceptor(Configuration conf, final ReorderBlocks lrb) {
|
||||
LOG.debug("Starting addLocationsOrderInterceptor with class " + lrb.getClass());
|
||||
|
||||
if (!conf.getBoolean("hbase.filesystem.reorder.blocks", true)) { // activated by default
|
||||
LOG.debug("addLocationsOrderInterceptor configured to false");
|
||||
return false;
|
||||
}
|
||||
|
||||
FileSystem fs;
|
||||
try {
|
||||
fs = FileSystem.get(conf);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Can't get the file system from the conf.", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!(fs instanceof DistributedFileSystem)) {
|
||||
LOG.warn("The file system is not a DistributedFileSystem." +
|
||||
"Not adding block location reordering");
|
||||
return false;
|
||||
}
|
||||
|
||||
DistributedFileSystem dfs = (DistributedFileSystem) fs;
|
||||
DFSClient dfsc = dfs.getClient();
|
||||
if (dfsc == null) {
|
||||
LOG.warn("The DistributedFileSystem does not contain a DFSClient. Can't add the location " +
|
||||
"block reordering interceptor. Continuing, but this is unexpected."
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
Field nf = DFSClient.class.getDeclaredField("namenode");
|
||||
nf.setAccessible(true);
|
||||
Field modifiersField = Field.class.getDeclaredField("modifiers");
|
||||
modifiersField.setAccessible(true);
|
||||
modifiersField.setInt(nf, nf.getModifiers() & ~Modifier.FINAL);
|
||||
|
||||
ClientProtocol namenode = (ClientProtocol) nf.get(dfsc);
|
||||
if (namenode == null) {
|
||||
LOG.warn("The DFSClient is not linked to a namenode. Can't add the location block" +
|
||||
" reordering interceptor. Continuing, but this is unexpected."
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
ClientProtocol cp1 = createReorderingProxy(namenode, lrb, conf);
|
||||
nf.set(dfsc, cp1);
|
||||
LOG.info("Added intercepting call to namenode#getBlockLocations");
|
||||
} catch (NoSuchFieldException e) {
|
||||
LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
|
||||
return false;
|
||||
} catch (IllegalAccessException e) {
|
||||
LOG.warn("Can't modify the DFSClient#namenode field to add the location reorder.", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static ClientProtocol createReorderingProxy(final ClientProtocol cp,
|
||||
final ReorderBlocks lrb, final Configuration conf) {
|
||||
return (ClientProtocol) Proxy.newProxyInstance
|
||||
(cp.getClass().getClassLoader(),
|
||||
new Class[]{ClientProtocol.class},
|
||||
new InvocationHandler() {
|
||||
public Object invoke(Object proxy, Method method,
|
||||
Object[] args) throws Throwable {
|
||||
Object res = method.invoke(cp, args);
|
||||
if (res != null && args.length == 3 && "getBlockLocations".equals(method.getName())
|
||||
&& res instanceof LocatedBlocks
|
||||
&& args[0] instanceof String
|
||||
&& args[0] != null) {
|
||||
lrb.reorderBlocks(conf, (LocatedBlocks) res, (String) args[0]);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface to implement to add a specific reordering logic in hdfs.
|
||||
*/
|
||||
static interface ReorderBlocks {
|
||||
/**
|
||||
*
|
||||
* @param conf - the conf to use
|
||||
* @param lbs - the LocatedBlocks to reorder
|
||||
* @param src - the file name currently read
|
||||
* @throws IOException - if something went wrong
|
||||
*/
|
||||
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* We're putting at lowest priority the hlog files blocks that are on the same datanode
|
||||
* as the original regionserver which created these files. This because we fear that the
|
||||
* datanode is actually dead, so if we use it it will timeout.
|
||||
*/
|
||||
static class ReorderWALBlocks implements ReorderBlocks {
|
||||
public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)
|
||||
throws IOException {
|
||||
|
||||
ServerName sn = HLog.getServerNameFromHLogDirectoryName(conf, src);
|
||||
if (sn == null) {
|
||||
// It's not an HLOG
|
||||
return;
|
||||
}
|
||||
|
||||
// Ok, so it's an HLog
|
||||
String hostName = sn.getHostname();
|
||||
LOG.debug(src + " is an HLog file, so reordering blocks, last hostname will be:" + hostName);
|
||||
|
||||
// Just check for all blocks
|
||||
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
|
||||
DatanodeInfo[] dnis = lb.getLocations();
|
||||
if (dnis != null && dnis.length > 1) {
|
||||
boolean found = false;
|
||||
for (int i = 0; i < dnis.length - 1 && !found; i++) {
|
||||
if (hostName.equals(dnis[i].getHostName())) {
|
||||
// advance the other locations by one and put this one at the last place.
|
||||
DatanodeInfo toLast = dnis[i];
|
||||
System.arraycopy(dnis, i + 1, dnis, i, dnis.length - i - 1);
|
||||
dnis[dnis.length - 1] = toLast;
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new HFileSystem object, similar to FileSystem.get().
|
||||
* This returns a filesystem object that avoids checksum
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -45,9 +44,9 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
|||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException;
|
||||
|
@ -118,6 +117,7 @@ public class MasterFileSystem {
|
|||
// setup the filesystem variable
|
||||
// set up the archived logs path
|
||||
this.oldLogDir = createInitialFileSystemLayout();
|
||||
HFileSystem.addLocationsOrderInterceptor(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -151,6 +152,7 @@ public class HLog implements Syncable {
|
|||
|
||||
private WALCoprocessorHost coprocessorHost;
|
||||
|
||||
|
||||
static void resetLogReaderClass() {
|
||||
HLog.logReaderClass = null;
|
||||
}
|
||||
|
@ -1757,6 +1759,62 @@ public class HLog implements Syncable {
|
|||
return dirName.toString();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param path - the path to analyze. Expected format, if it's in hlog directory:
|
||||
* / [base directory for hbase] / hbase / .logs / ServerName / logfile
|
||||
* @return null if it's not a log file. Returns the ServerName of the region server that created
|
||||
* this log file otherwise.
|
||||
*/
|
||||
public static ServerName getServerNameFromHLogDirectoryName(Configuration conf, String path)
|
||||
throws IOException {
|
||||
if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (conf == null) {
|
||||
throw new IllegalArgumentException("parameter conf must be set");
|
||||
}
|
||||
|
||||
final String rootDir = conf.get(HConstants.HBASE_DIR);
|
||||
if (rootDir == null || rootDir.isEmpty()) {
|
||||
throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf.");
|
||||
}
|
||||
|
||||
final StringBuilder startPathSB = new StringBuilder(rootDir);
|
||||
if (!rootDir.endsWith("/")) startPathSB.append('/');
|
||||
startPathSB.append(HConstants.HREGION_LOGDIR_NAME);
|
||||
if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) startPathSB.append('/');
|
||||
final String startPath = startPathSB.toString();
|
||||
|
||||
String fullPath;
|
||||
try {
|
||||
fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString();
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!fullPath.startsWith(startPath)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final String serverNameAndFile = fullPath.substring(startPath.length());
|
||||
|
||||
if (serverNameAndFile.indexOf('/') < "a,0,0".length()) {
|
||||
// Either it's a file (not a directory) or it's not a ServerName format
|
||||
return null;
|
||||
}
|
||||
|
||||
final String serverName = serverNameAndFile.substring(0, serverNameAndFile.indexOf('/') - 1);
|
||||
|
||||
if (!ServerName.isFullServerName(serverName)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return ServerName.parseServerName(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the directory we are making logs in.
|
||||
*
|
||||
|
|
|
@ -439,6 +439,25 @@ public class HBaseTestingUtility {
|
|||
return this.dfsCluster;
|
||||
}
|
||||
|
||||
|
||||
public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[])
|
||||
throws Exception {
|
||||
createDirsAndSetProperties();
|
||||
this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
|
||||
true, null, racks, hosts, null);
|
||||
|
||||
// Set this just-started cluster as our filesystem.
|
||||
FileSystem fs = this.dfsCluster.getFileSystem();
|
||||
this.conf.set("fs.defaultFS", fs.getUri().toString());
|
||||
// Do old style too just to be safe.
|
||||
this.conf.set("fs.default.name", fs.getUri().toString());
|
||||
|
||||
// Wait for the cluster to be totally up
|
||||
this.dfsCluster.waitClusterUp();
|
||||
|
||||
return this.dfsCluster;
|
||||
}
|
||||
|
||||
public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException {
|
||||
createDirsAndSetProperties();
|
||||
dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null,
|
||||
|
@ -637,6 +656,11 @@ public class HBaseTestingUtility {
|
|||
return startMiniHBaseCluster(numMasters, numSlaves);
|
||||
}
|
||||
|
||||
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves)
|
||||
throws IOException, InterruptedException{
|
||||
return startMiniHBaseCluster(numMasters, numSlaves, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts up mini hbase cluster. Usually used after call to
|
||||
* {@link #startMiniCluster(int, int)} when doing stepped startup of clusters.
|
||||
|
@ -649,7 +673,8 @@ public class HBaseTestingUtility {
|
|||
* @see {@link #startMiniCluster()}
|
||||
*/
|
||||
public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
|
||||
final int numSlaves)
|
||||
final int numSlaves, Class<? extends HMaster> masterClass,
|
||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||
throws IOException, InterruptedException {
|
||||
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
|
||||
createRootDir();
|
||||
|
@ -660,7 +685,8 @@ public class HBaseTestingUtility {
|
|||
conf.setInt("hbase.master.wait.on.regionservers.maxtostart", numSlaves);
|
||||
|
||||
Configuration c = new Configuration(this.conf);
|
||||
this.hbaseCluster = new MiniHBaseCluster(c, numMasters, numSlaves);
|
||||
this.hbaseCluster =
|
||||
new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass);
|
||||
// Don't leave here till we've done a successful scan of the .META.
|
||||
HTable t = new HTable(c, HConstants.META_TABLE_NAME);
|
||||
ResultScanner s = t.getScanner(new Scan());
|
||||
|
|
|
@ -76,11 +76,20 @@ public class MiniHBaseCluster {
|
|||
* @throws IOException
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int numMasters,
|
||||
int numRegionServers)
|
||||
throws IOException, InterruptedException {
|
||||
int numRegionServers)
|
||||
throws IOException, InterruptedException {
|
||||
this.conf = conf;
|
||||
conf.set(HConstants.MASTER_PORT, "0");
|
||||
init(numMasters, numRegionServers);
|
||||
init(numMasters, numRegionServers, null, null);
|
||||
}
|
||||
|
||||
public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers,
|
||||
Class<? extends HMaster> masterClass,
|
||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||
throws IOException, InterruptedException {
|
||||
this.conf = conf;
|
||||
conf.set(HConstants.MASTER_PORT, "0");
|
||||
init(numMasters, numRegionServers, masterClass, regionserverClass);
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
|
@ -186,12 +195,21 @@ public class MiniHBaseCluster {
|
|||
}
|
||||
}
|
||||
|
||||
private void init(final int nMasterNodes, final int nRegionNodes)
|
||||
private void init(final int nMasterNodes, final int nRegionNodes,
|
||||
Class<? extends HMaster> masterClass,
|
||||
Class<? extends MiniHBaseCluster.MiniHBaseClusterRegionServer> regionserverClass)
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
if (masterClass == null){
|
||||
masterClass = HMaster.class;
|
||||
}
|
||||
if (regionserverClass == null){
|
||||
regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class;
|
||||
}
|
||||
|
||||
// start up a LocalHBaseCluster
|
||||
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0,
|
||||
HMaster.class, MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
|
||||
masterClass, regionserverClass);
|
||||
|
||||
// manually add the regionservers as other users
|
||||
for (int i=0; i<nRegionNodes; i++) {
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.io.SequenceFile;
|
|||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -720,6 +721,29 @@ public class TestHLog {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testGetServerNameFromHLogDirectoryName() throws IOException {
|
||||
String hl = conf.get(HConstants.HBASE_DIR) + "/"+
|
||||
HLog.getHLogDirectoryName(new ServerName("hn", 450, 1398).toString());
|
||||
|
||||
// Must not throw exception
|
||||
Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf, null));
|
||||
Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf,
|
||||
conf.get(HConstants.HBASE_DIR) + "/"));
|
||||
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "") );
|
||||
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, " ") );
|
||||
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl) );
|
||||
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"qdf") );
|
||||
Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "sfqf"+hl+"qdf") );
|
||||
|
||||
Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, conf.get(
|
||||
HConstants.HBASE_DIR) +
|
||||
"/.logs/localhost,32984,1343316388997/localhost%2C32984%2C1343316388997.1343316390417"
|
||||
));
|
||||
Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"/qdf") );
|
||||
}
|
||||
|
||||
/**
|
||||
* A loaded WAL coprocessor won't break existing HLog test cases.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue