diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index 621d6df42c4..258bb18a0f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hbase.util.FSUtils; /** * The FileLink is a sort of hardlink, that allows access to a file given a set of locations. @@ -107,7 +108,7 @@ public class FileLink { public FileLinkInputStream(final FileSystem fs, final FileLink fileLink) throws IOException { - this(fs, fileLink, fs.getConf().getInt("io.file.buffer.size", 4096)); + this(fs, fileLink, FSUtils.getDefaultBufferSize(fs)); } public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 3fecd8098dd..11c000fb12c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -328,7 +328,7 @@ class FSHLog implements HLog, Syncable { } this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize", - getDefaultBlockSize()); + FSUtils.getDefaultBlockSize(this.fs, this.dir)); // Roll at 95% of block size. float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f); this.logrollsize = (long)(this.blocksize * multi); @@ -338,7 +338,7 @@ class FSHLog implements HLog, Syncable { this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", - this.fs.getDefaultReplication()); + FSUtils.getDefaultReplication(fs, this.dir)); this.lowReplicationRollLimit = conf.getInt( "hbase.regionserver.hlog.lowreplication.rolllimit", 5); this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true); @@ -389,33 +389,6 @@ class FSHLog implements HLog, Syncable { this.metrics = new MetricsWAL(); } - // use reflection to search for getDefaultBlockSize(Path f) - // if the method doesn't exist, fall back to using getDefaultBlockSize() - private long getDefaultBlockSize() throws IOException { - Method m = null; - Class cls = this.fs.getClass(); - try { - m = cls.getMethod("getDefaultBlockSize", - new Class[] { Path.class }); - } catch (NoSuchMethodException e) { - LOG.info("FileSystem doesn't support getDefaultBlockSize"); - } catch (SecurityException e) { - LOG.info("Doesn't have access to getDefaultBlockSize on " - + "FileSystems", e); - m = null; // could happen on setAccessible() - } - if (null == m) { - return this.fs.getDefaultBlockSize(); - } else { - try { - Object ret = m.invoke(this.fs, this.dir); - return ((Long)ret).longValue(); - } catch (Exception e) { - throw new IOException(e); - } - } - } - /** * Find the 'getNumCurrentReplicas' on the passed os stream. * @return Method or null. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 3d81cc455d7..aba09f7f947 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.util.FSUtils; /** * Writer for protobuf-based WAL. @@ -63,10 +64,11 @@ public class ProtobufLogWriter implements HLog.Writer { throw new IOException("Failed to initiate CompressionContext", e); } } - int bufferSize = fs.getConf().getInt("io.file.buffer.size", 4096); + int bufferSize = FSUtils.getDefaultBufferSize(fs); short replication = (short)conf.getInt( - "hbase.regionserver.hlog.replication", fs.getDefaultReplication()); - long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", fs.getDefaultBlockSize()); + "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); + long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", + FSUtils.getDefaultBlockSize(fs, path)); output = fs.create(path, true, bufferSize, replication, blockSize); output.write(ProtobufLogReader.PB_WAL_MAGIC); WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 4c6235390d1..5eea89dd498 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -167,6 +167,87 @@ public abstract class FSUtils { return fs.exists(dir) && fs.delete(dir, true); } + /** + * Return the number of bytes that large input files should be optimally + * be split into to minimize i/o time. + * + * use reflection to search for getDefaultBlockSize(Path f) + * if the method doesn't exist, fall back to using getDefaultBlockSize() + * + * @param fs filesystem object + * @return the default block size for the path's filesystem + * @throws IOException e + */ + public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultBlockSize", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultBlockSize"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultBlockSize(); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Long)ret).longValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /* + * Get the default replication. + * + * use reflection to search for getDefaultReplication(Path f) + * if the method doesn't exist, fall back to using getDefaultReplication() + * + * @param fs filesystem object + * @param f path of file + * @return default replication for the path's filesystem + * @throws IOException e + */ + public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException { + Method m = null; + Class cls = fs.getClass(); + try { + m = cls.getMethod("getDefaultReplication", new Class[] { Path.class }); + } catch (NoSuchMethodException e) { + LOG.info("FileSystem doesn't support getDefaultReplication"); + } catch (SecurityException e) { + LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e); + m = null; // could happen on setAccessible() + } + if (m == null) { + return fs.getDefaultReplication(); + } else { + try { + Object ret = m.invoke(fs, path); + return ((Number)ret).shortValue(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /** + * Returns the default buffer size to use during writes. + * + * The size of the buffer should probably be a multiple of hardware + * page size (4096 on Intel x86), and it determines how much data is + * buffered during read and write operations. + * + * @param fs filesystem object + * @return default buffer size to use during writes + */ + public static int getDefaultBufferSize(final FileSystem fs) { + return fs.getConf().getInt("io.file.buffer.size", 4096); + } /** * Create the specified file on the filesystem. By default, this will: @@ -211,9 +292,8 @@ public abstract class FSUtils { FsPermission perm, boolean overwrite) throws IOException { LOG.debug("Creating file=" + path + " with permission=" + perm); - return fs.create(path, perm, overwrite, - fs.getConf().getInt("io.file.buffer.size", 4096), - fs.getDefaultReplication(), fs.getDefaultBlockSize(), null); + return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), + getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 61a358a1198..4bf67f6d245 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -116,12 +117,12 @@ public class SequenceFileLogWriter implements HLog.Writer { Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE, CompressionType.class, CompressionCodec.class, Metadata.class}) .invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class, - Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)), + Integer.valueOf(FSUtils.getDefaultBufferSize(fs)), Short.valueOf((short) conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication())), + FSUtils.getDefaultReplication(fs, path))), Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize())), + FSUtils.getDefaultBlockSize(fs, path))), Boolean.valueOf(false) /*createParent*/, SequenceFile.CompressionType.NONE, new DefaultCodec(), createMetadata(conf, compress) @@ -138,11 +139,11 @@ public class SequenceFileLogWriter implements HLog.Writer { LOG.debug("new createWriter -- HADOOP-6840 -- not available"); this.writer = SequenceFile.createWriter(fs, conf, path, HLogKey.class, WALEdit.class, - fs.getConf().getInt("io.file.buffer.size", 4096), + FSUtils.getDefaultBufferSize(fs), (short) conf.getInt("hbase.regionserver.hlog.replication", - fs.getDefaultReplication()), + FSUtils.getDefaultReplication(fs, path)), conf.getLong("hbase.regionserver.hlog.blocksize", - fs.getDefaultBlockSize()), + FSUtils.getDefaultBlockSize(fs, path)), SequenceFile.CompressionType.NONE, new DefaultCodec(), null,