HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#1335)

Signed-off-by: stack <stack@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
Duo Zhang 2020-03-26 17:05:31 +08:00
parent 224b706c6e
commit 35f7d050ce
8 changed files with 68 additions and 249 deletions

View File

@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -490,26 +491,19 @@ public abstract class CommonFSUtils {
String trimmedStoragePolicy = storagePolicy.trim();
if (trimmedStoragePolicy.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed an empty storagePolicy, exiting early.");
LOG.trace("We were passed an empty storagePolicy, exiting early.");
} else {
trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
if (LOG.isTraceEnabled()) {
LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
try {
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Failed to invoke set storage policy API on FS", e);
LOG.trace("Failed to invoke set storage policy API on FS", e);
if (throwException) {
throw e;
@ -525,10 +519,7 @@ public abstract class CommonFSUtils {
try {
fs.setStoragePolicy(path, storagePolicy);
if (LOG.isDebugEnabled()) {
LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
} catch (Exception e) {
toThrow = e;
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
@ -541,19 +532,9 @@ public abstract class CommonFSUtils {
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
// check for lack of HDFS-7228
if (e instanceof RemoteException &&
((RemoteException)e).getClassName())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
// Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
// that throws UnsupportedOperationException
} else if (e instanceof UnsupportedOperationException) {
if (e instanceof UnsupportedOperationException) {
if (LOG.isDebugEnabled()) {
LOG.debug("The underlying FileSystem implementation doesn't support " +
"setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
@ -759,201 +740,75 @@ public abstract class CommonFSUtils {
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
private static class DfsBuilderUtility {
static Class<?> dfsClass = null;
static Method createMethod;
static Method overwriteMethod;
static Method bufferSizeMethod;
static Method blockSizeMethod;
static Method recursiveMethod;
static Method replicateMethod;
static Method replicationMethod;
static Method buildMethod;
static boolean allMethodsPresent = false;
private static final class DfsBuilderUtility {
private static final Class<?> BUILDER;
private static final Method REPLICATE;
static {
String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
Class<?> builderClass = null;
try {
dfsClass = Class.forName(dfsName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
try {
builderClass = Class.forName(builderName);
} catch (ClassNotFoundException e) {
LOG.debug("{} not available, will not use builder API for file creation.", builderName);
LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
if (dfsClass != null && builderClass != null) {
Method replicateMethod = null;
if (builderClass != null) {
try {
createMethod = dfsClass.getMethod("createFile", Path.class);
overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
blockSizeMethod = builderClass.getMethod("blockSize", long.class);
recursiveMethod = builderClass.getMethod("recursive");
replicateMethod = builderClass.getMethod("replicate");
replicationMethod = builderClass.getMethod("replication", short.class);
buildMethod = builderClass.getMethod("build");
allMethodsPresent = true;
LOG.debug("Using builder API via reflection for DFS file creation.");
} catch (NoSuchMethodException e) {
LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
LOG.debug("Could not find replicate method on builder; will not set replicate when" +
" creating output stream", e);
BUILDER = builderClass;
REPLICATE = replicateMethod;
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* Attempt to use builder API via reflection to call the replicate method on the given builder.
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
if (allMethodsPresent && dfsClass.isInstance(fs)) {
static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
try {
Object builder;
builder = createMethod.invoke(fs, path);
builder = overwriteMethod.invoke(builder, overwritable);
builder = bufferSizeMethod.invoke(builder, bufferSize);
builder = blockSizeMethod.invoke(builder, blockSize);
if (isRecursive) {
builder = recursiveMethod.invoke(builder);
builder = replicateMethod.invoke(builder);
builder = replicationMethod.invoke(builder, replication);
return (FSDataOutputStream) buildMethod.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
if (isRecursive) {
return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
throws IOException {
if (allMethodsPresent && dfsClass.isInstance(fs)) {
try {
Object builder;
builder = createMethod.invoke(fs, path);
builder = overwriteMethod.invoke(builder, overwritable);
builder = replicateMethod.invoke(builder);
return (FSDataOutputStream) buildMethod.invoke(builder);
} catch (IllegalAccessException | InvocationTargetException e) {
// Should have caught this failure during initialization, so log full trace here
LOG.warn("Couldn't use reflection with builder API", e);
return fs.create(path, overwritable);
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p>
* <p/>
* Will not attempt to enable replication when passed an HFileSystem.
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
throws IOException {
return DfsBuilderUtility.createHelper(fs, path, overwritable);
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
throws IOException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
return builder.build();
* Attempt to use builder API via reflection to create a file with the given parameters and
* replication enabled.
* <p>
* <p/>
* Will not attempt to enable replication when passed an HFileSystem.
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
blockSize, isRecursive);
// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
// not until we attempt to reference it.
private static class StreamCapabilities {
public static final boolean PRESENT;
public static final Class<?> CLASS;
public static final Method METHOD;
static {
boolean tmp = false;
Class<?> clazz = null;
Method method = null;
try {
clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
method = clazz.getMethod("hasCapability", String.class);
tmp = true;
} catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
"HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
"support hflush/hsync. If you are running on top of HDFS this probably just " +
"means you have an older version and this can be ignored. If you are running on " +
"top of an alternate FileSystem implementation you should manually verify that " +
"hflush and hsync are implemented; otherwise you risk data loss and hard to " +
"diagnose errors when our assumptions are violated.");
LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
} finally {
PRESENT = tmp;
CLASS = clazz;
METHOD = method;
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
if (isRecursive) {
* If our FileSystem version includes the StreamCapabilities class, check if
* the given stream has a particular capability.
* @param stream capabilities are per-stream instance, so check this one specifically. must not be
* null
* @param capability what to look for, per Hadoop Common's FileSystem docs
* @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
* implement it. return result of asking the stream otherwise.
public static boolean hasCapability(FSDataOutputStream stream, String capability) {
// be consistent whether or not StreamCapabilities is present
if (stream == null) {
throw new NullPointerException("stream parameter must not be null.");
// If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
// otherwise old versions of Hadoop will break.
boolean result = true;
if (StreamCapabilities.PRESENT) {
// if StreamCapabilities is present, but the stream doesn't implement it
// or we run into a problem invoking the method,
// we treat that as equivalent to not declaring anything
result = false;
if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
try {
result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
} catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
exception) {
LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
"our understanding of how it's supposed to work. Please file a JIRA and include " +
"the following stack trace. In the mean time we're interpreting this behavior " +
"difference as a lack of capability support, which will probably cause a failure.",
return result;
return builder.build();

View File

@ -19,9 +19,8 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -35,8 +34,6 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Test {@link CommonFSUtils}.
@ -48,8 +45,6 @@ public class TestCommonFSUtils {
public static final HBaseClassTestRule CLASS_RULE =
private static final Logger LOG = LoggerFactory.getLogger(TestCommonFSUtils.class);
private HBaseCommonTestingUtility htu;
private Configuration conf;
@ -131,38 +126,4 @@ public class TestCommonFSUtils {
Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
assertEquals("test/testlog", CommonFSUtils.removeWALRootPath(logFile, conf));
public void streamCapabilitiesDoesNotAllowNullStream() {
CommonFSUtils.hasCapability(null, "hopefully any string");
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
static {
boolean tmp = false;
try {
tmp = true;
LOG.debug("Test thought StreamCapabilities class was present.");
} catch (ClassNotFoundException exception) {
LOG.debug("Test didn't think StreamCapabilities class was present.");
} finally {
public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "hsync"));
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "hflush"));
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +

View File

@ -35,11 +35,13 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.log.HBaseMarkers;
@ -1084,8 +1086,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
// After we create the stream but before we attempt to use it at all
// ensure that we can provide the level of data safety we're configured
// to provide.
final String durability = useHsync ? "hsync" : "hflush";
if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH;
if (enforceStreamCapability && !newStream.hasCapability(durability)) {
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
" for proper operation during component failures, but the underlying filesystem does " +
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
@ -1151,12 +1153,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
} catch (IOException e) {
} catch (IOException | FSError e) {
LOG.warn("Unable to write the trailer", e);
try {
} catch (IOException e) {
} catch (IOException | FSError e) {
LOG.error("Unable to close the stream", e);
stream = null;

View File

@ -655,7 +655,7 @@ public class TestWALProcedureStore {
public void testLogFileAleadExists() throws IOException {
public void testLogFileAlreadyExists() throws IOException {
final boolean[] tested = {false};
WALProcedureStore mStore = Mockito.spy(procStore);

View File

@ -18,15 +18,17 @@
package org.apache.hadoop.hbase.io.asyncfs;
import java.io.IOException;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@ -63,11 +65,15 @@ public final class AsyncFSOutputHelper {
// After we create the stream but before we attempt to use it at all
// ensure that we can provide the level of data safety we're configured
// to provide.
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
!(CommonFSUtils.hasCapability(out, "hflush") &&
CommonFSUtils.hasCapability(out, "hsync"))) {
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
Closeables.close(out, true);
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
if (!out.hasCapability(StreamCapabilities.HSYNC)) {
Closeables.close(out, true);
throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
return new WrapperAsyncFSOutput(f, out);

View File

@ -22,6 +22,7 @@ import java.io.OutputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
@ -90,18 +91,17 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
return this.output;
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
blockSize, false);
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
if (!CommonFSUtils.hasCapability(output, "hflush")) {
throw new StreamLacksCapabilityException("hflush");
if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
if (!CommonFSUtils.hasCapability(output, "hsync")) {
throw new StreamLacksCapabilityException("hsync");
if (!output.hasCapability(StreamCapabilities.HSYNC)) {
throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);

View File

@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.lang.reflect.Method;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
@ -55,8 +55,6 @@ public class TestHBaseWalOnEC {
private static final HBaseTestingUtility util = new HBaseTestingUtility();
private static final String HFLUSH = "hflush";
public static void setup() throws Exception {
try {
@ -75,7 +73,7 @@ public class TestHBaseWalOnEC {
try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
// If this comes back as having hflush then some test setup assumption is wrong.
// Fail the test so that a developer has to look and triage
assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH));
assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
} catch (NoSuchMethodException e) {
// We're not testing anything interesting if EC is not available, so skip the rest of the test
@ -95,7 +93,7 @@ public class TestHBaseWalOnEC {
public void testStreamCreate() throws IOException {
try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
new Path("/testStreamCreate"), true)) {
assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -661,15 +662,11 @@ public class TestFSUtils {
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
try (FileSystem filesystem = cluster.getFileSystem()) {
FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
assertTrue(FSUtils.hasCapability(stream, "hsync"));
assertTrue(FSUtils.hasCapability(stream, "hflush"));
assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
"StreamCapabilities class is not defined.",
FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
} finally {