HBASE-8516 FSUtils.create() fail with ViewFS
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1482386 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
77a08bbe0a
commit
8d3581dc7e
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PositionedReadable;
|
import org.apache.hadoop.fs.PositionedReadable;
|
||||||
import org.apache.hadoop.fs.Seekable;
|
import org.apache.hadoop.fs.Seekable;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
|
* The FileLink is a sort of hardlink, that allows access to a file given a set of locations.
|
||||||
|
@ -107,7 +108,7 @@ public class FileLink {
|
||||||
|
|
||||||
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
|
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(fs, fileLink, fs.getConf().getInt("io.file.buffer.size", 4096));
|
this(fs, fileLink, FSUtils.getDefaultBufferSize(fs));
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
|
public FileLinkInputStream(final FileSystem fs, final FileLink fileLink, int bufferSize)
|
||||||
|
|
|
@ -328,7 +328,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
|
this.blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||||
getDefaultBlockSize());
|
FSUtils.getDefaultBlockSize(this.fs, this.dir));
|
||||||
// Roll at 95% of block size.
|
// Roll at 95% of block size.
|
||||||
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
|
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
|
||||||
this.logrollsize = (long)(this.blocksize * multi);
|
this.logrollsize = (long)(this.blocksize * multi);
|
||||||
|
@ -338,7 +338,7 @@ class FSHLog implements HLog, Syncable {
|
||||||
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
|
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||||
this.minTolerableReplication = conf.getInt(
|
this.minTolerableReplication = conf.getInt(
|
||||||
"hbase.regionserver.hlog.tolerable.lowreplication",
|
"hbase.regionserver.hlog.tolerable.lowreplication",
|
||||||
this.fs.getDefaultReplication());
|
FSUtils.getDefaultReplication(fs, this.dir));
|
||||||
this.lowReplicationRollLimit = conf.getInt(
|
this.lowReplicationRollLimit = conf.getInt(
|
||||||
"hbase.regionserver.hlog.lowreplication.rolllimit", 5);
|
"hbase.regionserver.hlog.lowreplication.rolllimit", 5);
|
||||||
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
|
this.enabled = conf.getBoolean("hbase.regionserver.hlog.enabled", true);
|
||||||
|
@ -389,33 +389,6 @@ class FSHLog implements HLog, Syncable {
|
||||||
this.metrics = new MetricsWAL();
|
this.metrics = new MetricsWAL();
|
||||||
}
|
}
|
||||||
|
|
||||||
// use reflection to search for getDefaultBlockSize(Path f)
|
|
||||||
// if the method doesn't exist, fall back to using getDefaultBlockSize()
|
|
||||||
private long getDefaultBlockSize() throws IOException {
|
|
||||||
Method m = null;
|
|
||||||
Class<? extends FileSystem> cls = this.fs.getClass();
|
|
||||||
try {
|
|
||||||
m = cls.getMethod("getDefaultBlockSize",
|
|
||||||
new Class<?>[] { Path.class });
|
|
||||||
} catch (NoSuchMethodException e) {
|
|
||||||
LOG.info("FileSystem doesn't support getDefaultBlockSize");
|
|
||||||
} catch (SecurityException e) {
|
|
||||||
LOG.info("Doesn't have access to getDefaultBlockSize on "
|
|
||||||
+ "FileSystems", e);
|
|
||||||
m = null; // could happen on setAccessible()
|
|
||||||
}
|
|
||||||
if (null == m) {
|
|
||||||
return this.fs.getDefaultBlockSize();
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
Object ret = m.invoke(this.fs, this.dir);
|
|
||||||
return ((Long)ret).longValue();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new IOException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
|
* Find the 'getNumCurrentReplicas' on the passed <code>os</code> stream.
|
||||||
* @return Method or null.
|
* @return Method or null.
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
import org.apache.hadoop.hbase.codec.Codec;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writer for protobuf-based WAL.
|
* Writer for protobuf-based WAL.
|
||||||
|
@ -63,10 +64,11 @@ public class ProtobufLogWriter implements HLog.Writer {
|
||||||
throw new IOException("Failed to initiate CompressionContext", e);
|
throw new IOException("Failed to initiate CompressionContext", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int bufferSize = fs.getConf().getInt("io.file.buffer.size", 4096);
|
int bufferSize = FSUtils.getDefaultBufferSize(fs);
|
||||||
short replication = (short)conf.getInt(
|
short replication = (short)conf.getInt(
|
||||||
"hbase.regionserver.hlog.replication", fs.getDefaultReplication());
|
"hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path));
|
||||||
long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", fs.getDefaultBlockSize());
|
long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||||
|
FSUtils.getDefaultBlockSize(fs, path));
|
||||||
output = fs.create(path, true, bufferSize, replication, blockSize);
|
output = fs.create(path, true, bufferSize, replication, blockSize);
|
||||||
output.write(ProtobufLogReader.PB_WAL_MAGIC);
|
output.write(ProtobufLogReader.PB_WAL_MAGIC);
|
||||||
WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);
|
WALHeader.newBuilder().setHasCompression(doCompress).build().writeDelimitedTo(output);
|
||||||
|
|
|
@ -167,6 +167,87 @@ public abstract class FSUtils {
|
||||||
return fs.exists(dir) && fs.delete(dir, true);
|
return fs.exists(dir) && fs.delete(dir, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the number of bytes that large input files should be optimally
|
||||||
|
* be split into to minimize i/o time.
|
||||||
|
*
|
||||||
|
* use reflection to search for getDefaultBlockSize(Path f)
|
||||||
|
* if the method doesn't exist, fall back to using getDefaultBlockSize()
|
||||||
|
*
|
||||||
|
* @param fs filesystem object
|
||||||
|
* @return the default block size for the path's filesystem
|
||||||
|
* @throws IOException e
|
||||||
|
*/
|
||||||
|
public static long getDefaultBlockSize(final FileSystem fs, final Path path) throws IOException {
|
||||||
|
Method m = null;
|
||||||
|
Class<? extends FileSystem> cls = fs.getClass();
|
||||||
|
try {
|
||||||
|
m = cls.getMethod("getDefaultBlockSize", new Class<?>[] { Path.class });
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
LOG.info("FileSystem doesn't support getDefaultBlockSize");
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
LOG.info("Doesn't have access to getDefaultBlockSize on FileSystems", e);
|
||||||
|
m = null; // could happen on setAccessible()
|
||||||
|
}
|
||||||
|
if (m == null) {
|
||||||
|
return fs.getDefaultBlockSize();
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
Object ret = m.invoke(fs, path);
|
||||||
|
return ((Long)ret).longValue();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get the default replication.
|
||||||
|
*
|
||||||
|
* use reflection to search for getDefaultReplication(Path f)
|
||||||
|
* if the method doesn't exist, fall back to using getDefaultReplication()
|
||||||
|
*
|
||||||
|
* @param fs filesystem object
|
||||||
|
* @param f path of file
|
||||||
|
* @return default replication for the path's filesystem
|
||||||
|
* @throws IOException e
|
||||||
|
*/
|
||||||
|
public static short getDefaultReplication(final FileSystem fs, final Path path) throws IOException {
|
||||||
|
Method m = null;
|
||||||
|
Class<? extends FileSystem> cls = fs.getClass();
|
||||||
|
try {
|
||||||
|
m = cls.getMethod("getDefaultReplication", new Class<?>[] { Path.class });
|
||||||
|
} catch (NoSuchMethodException e) {
|
||||||
|
LOG.info("FileSystem doesn't support getDefaultReplication");
|
||||||
|
} catch (SecurityException e) {
|
||||||
|
LOG.info("Doesn't have access to getDefaultReplication on FileSystems", e);
|
||||||
|
m = null; // could happen on setAccessible()
|
||||||
|
}
|
||||||
|
if (m == null) {
|
||||||
|
return fs.getDefaultReplication();
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
Object ret = m.invoke(fs, path);
|
||||||
|
return ((Number)ret).shortValue();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the default buffer size to use during writes.
|
||||||
|
*
|
||||||
|
* The size of the buffer should probably be a multiple of hardware
|
||||||
|
* page size (4096 on Intel x86), and it determines how much data is
|
||||||
|
* buffered during read and write operations.
|
||||||
|
*
|
||||||
|
* @param fs filesystem object
|
||||||
|
* @return default buffer size to use during writes
|
||||||
|
*/
|
||||||
|
public static int getDefaultBufferSize(final FileSystem fs) {
|
||||||
|
return fs.getConf().getInt("io.file.buffer.size", 4096);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the specified file on the filesystem. By default, this will:
|
* Create the specified file on the filesystem. By default, this will:
|
||||||
|
@ -211,9 +292,8 @@ public abstract class FSUtils {
|
||||||
FsPermission perm, boolean overwrite) throws IOException {
|
FsPermission perm, boolean overwrite) throws IOException {
|
||||||
LOG.debug("Creating file=" + path + " with permission=" + perm);
|
LOG.debug("Creating file=" + path + " with permission=" + perm);
|
||||||
|
|
||||||
return fs.create(path, perm, overwrite,
|
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
|
||||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
|
||||||
fs.getDefaultReplication(), fs.getDefaultBlockSize(), null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
import org.apache.hadoop.io.SequenceFile.CompressionType;
|
||||||
|
@ -116,12 +117,12 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
||||||
Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
|
Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
|
||||||
CompressionType.class, CompressionCodec.class, Metadata.class})
|
CompressionType.class, CompressionCodec.class, Metadata.class})
|
||||||
.invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
|
.invoke(null, new Object[] {fs, conf, path, HLogKey.class, WALEdit.class,
|
||||||
Integer.valueOf(fs.getConf().getInt("io.file.buffer.size", 4096)),
|
Integer.valueOf(FSUtils.getDefaultBufferSize(fs)),
|
||||||
Short.valueOf((short)
|
Short.valueOf((short)
|
||||||
conf.getInt("hbase.regionserver.hlog.replication",
|
conf.getInt("hbase.regionserver.hlog.replication",
|
||||||
fs.getDefaultReplication())),
|
FSUtils.getDefaultReplication(fs, path))),
|
||||||
Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
|
Long.valueOf(conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||||
fs.getDefaultBlockSize())),
|
FSUtils.getDefaultBlockSize(fs, path))),
|
||||||
Boolean.valueOf(false) /*createParent*/,
|
Boolean.valueOf(false) /*createParent*/,
|
||||||
SequenceFile.CompressionType.NONE, new DefaultCodec(),
|
SequenceFile.CompressionType.NONE, new DefaultCodec(),
|
||||||
createMetadata(conf, compress)
|
createMetadata(conf, compress)
|
||||||
|
@ -138,11 +139,11 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
||||||
LOG.debug("new createWriter -- HADOOP-6840 -- not available");
|
LOG.debug("new createWriter -- HADOOP-6840 -- not available");
|
||||||
this.writer = SequenceFile.createWriter(fs, conf, path,
|
this.writer = SequenceFile.createWriter(fs, conf, path,
|
||||||
HLogKey.class, WALEdit.class,
|
HLogKey.class, WALEdit.class,
|
||||||
fs.getConf().getInt("io.file.buffer.size", 4096),
|
FSUtils.getDefaultBufferSize(fs),
|
||||||
(short) conf.getInt("hbase.regionserver.hlog.replication",
|
(short) conf.getInt("hbase.regionserver.hlog.replication",
|
||||||
fs.getDefaultReplication()),
|
FSUtils.getDefaultReplication(fs, path)),
|
||||||
conf.getLong("hbase.regionserver.hlog.blocksize",
|
conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||||
fs.getDefaultBlockSize()),
|
FSUtils.getDefaultBlockSize(fs, path)),
|
||||||
SequenceFile.CompressionType.NONE,
|
SequenceFile.CompressionType.NONE,
|
||||||
new DefaultCodec(),
|
new DefaultCodec(),
|
||||||
null,
|
null,
|
||||||
|
|
Loading…
Reference in New Issue