HBASE-24000 Simplify CommonFSUtils after upgrading to hadoop 2.10.0 (#1335)
Signed-off-by: stack <stack@apache.org> Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
76e3db6720
commit
f3ee9b8aa3
|
@ -27,11 +27,11 @@ import java.net.URISyntaxException;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
|
@ -490,26 +490,19 @@ public abstract class CommonFSUtils {
|
|||
}
|
||||
String trimmedStoragePolicy = storagePolicy.trim();
|
||||
if (trimmedStoragePolicy.isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed an empty storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
trimmedStoragePolicy = trimmedStoragePolicy.toUpperCase(Locale.ROOT);
|
||||
}
|
||||
if (trimmedStoragePolicy.equals(HConstants.DEFER_TO_HDFS_STORAGE_POLICY)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.",
|
||||
trimmedStoragePolicy);
|
||||
}
|
||||
LOG.trace("We were passed the defer-to-hdfs policy {}, exiting early.", trimmedStoragePolicy);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
|
||||
} catch (IOException e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Failed to invoke set storage policy API on FS", e);
|
||||
}
|
||||
if (throwException) {
|
||||
throw e;
|
||||
}
|
||||
|
@ -525,10 +518,7 @@ public abstract class CommonFSUtils {
|
|||
|
||||
try {
|
||||
fs.setStoragePolicy(path, storagePolicy);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Set storagePolicy={} for path={}", storagePolicy, path);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
toThrow = e;
|
||||
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
|
||||
|
@ -541,19 +531,9 @@ public abstract class CommonFSUtils {
|
|||
LOG.debug("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
}
|
||||
|
||||
// check for lack of HDFS-7228
|
||||
if (e instanceof RemoteException &&
|
||||
HadoopIllegalArgumentException.class.getName().equals(
|
||||
((RemoteException)e).getClassName())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Given storage policy, '" +storagePolicy +"', was rejected and probably " +
|
||||
"isn't a valid policy for the version of Hadoop you're running. I.e. if you're " +
|
||||
"trying to use SSD related policies then you're likely missing HDFS-7228. For " +
|
||||
"more information see the 'ArchivalStorage' docs for your Hadoop release.");
|
||||
}
|
||||
// Hadoop 2.8+, 3.0-a1+ added FileSystem.setStoragePolicy with a default implementation
|
||||
// that throws UnsupportedOperationException
|
||||
} else if (e instanceof UnsupportedOperationException) {
|
||||
if (e instanceof UnsupportedOperationException) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("The underlying FileSystem implementation doesn't support " +
|
||||
"setStoragePolicy. This is probably intentional on their part, since HDFS-9345 " +
|
||||
|
@ -759,200 +739,75 @@ public abstract class CommonFSUtils {
|
|||
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
|
||||
}
|
||||
|
||||
private static class DfsBuilderUtility {
|
||||
static Class<?> dfsClass = null;
|
||||
static Method createMethod;
|
||||
static Method overwriteMethod;
|
||||
static Method bufferSizeMethod;
|
||||
static Method blockSizeMethod;
|
||||
static Method recursiveMethod;
|
||||
static Method replicateMethod;
|
||||
static Method replicationMethod;
|
||||
static Method buildMethod;
|
||||
static boolean allMethodsPresent = false;
|
||||
private static final class DfsBuilderUtility {
|
||||
private static final Class<?> BUILDER;
|
||||
private static final Method REPLICATE;
|
||||
|
||||
static {
|
||||
String dfsName = "org.apache.hadoop.hdfs.DistributedFileSystem";
|
||||
String builderName = dfsName + "$HdfsDataOutputStreamBuilder";
|
||||
String builderName = "org.apache.hadoop.hdfs.DistributedFileSystem$HdfsDataOutputStreamBuilder";
|
||||
Class<?> builderClass = null;
|
||||
|
||||
try {
|
||||
dfsClass = Class.forName(dfsName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.debug("{} not available, will not use builder API for file creation.", dfsName);
|
||||
}
|
||||
try {
|
||||
builderClass = Class.forName(builderName);
|
||||
} catch (ClassNotFoundException e) {
|
||||
LOG.debug("{} not available, will not use builder API for file creation.", builderName);
|
||||
LOG.debug("{} not available, will not set replicate when creating output stream", builderName);
|
||||
}
|
||||
|
||||
if (dfsClass != null && builderClass != null) {
|
||||
Method replicateMethod = null;
|
||||
if (builderClass != null) {
|
||||
try {
|
||||
createMethod = dfsClass.getMethod("createFile", Path.class);
|
||||
overwriteMethod = builderClass.getMethod("overwrite", boolean.class);
|
||||
bufferSizeMethod = builderClass.getMethod("bufferSize", int.class);
|
||||
blockSizeMethod = builderClass.getMethod("blockSize", long.class);
|
||||
recursiveMethod = builderClass.getMethod("recursive");
|
||||
replicateMethod = builderClass.getMethod("replicate");
|
||||
replicationMethod = builderClass.getMethod("replication", short.class);
|
||||
buildMethod = builderClass.getMethod("build");
|
||||
|
||||
allMethodsPresent = true;
|
||||
LOG.debug("Using builder API via reflection for DFS file creation.");
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.debug("Could not find method on builder; will use old DFS API for file creation {}",
|
||||
e.getMessage());
|
||||
LOG.debug("Could not find replicate method on builder; will not set replicate when" +
|
||||
" creating output stream", e);
|
||||
}
|
||||
}
|
||||
BUILDER = builderClass;
|
||||
REPLICATE = replicateMethod;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to create a file with the given parameters and
|
||||
* replication enabled.
|
||||
* Attempt to use builder API via reflection to call the replicate method on the given builder.
|
||||
*/
|
||||
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable,
|
||||
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
|
||||
if (allMethodsPresent && dfsClass.isInstance(fs)) {
|
||||
static void replicate(FSDataOutputStreamBuilder<?, ?> builder) {
|
||||
if (BUILDER != null && REPLICATE != null && BUILDER.isAssignableFrom(builder.getClass())) {
|
||||
try {
|
||||
Object builder;
|
||||
|
||||
builder = createMethod.invoke(fs, path);
|
||||
builder = overwriteMethod.invoke(builder, overwritable);
|
||||
builder = bufferSizeMethod.invoke(builder, bufferSize);
|
||||
builder = blockSizeMethod.invoke(builder, blockSize);
|
||||
if (isRecursive) {
|
||||
builder = recursiveMethod.invoke(builder);
|
||||
}
|
||||
builder = replicateMethod.invoke(builder);
|
||||
builder = replicationMethod.invoke(builder, replication);
|
||||
return (FSDataOutputStream) buildMethod.invoke(builder);
|
||||
REPLICATE.invoke(builder);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
// Should have caught this failure during initialization, so log full trace here
|
||||
LOG.warn("Couldn't use reflection with builder API", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (isRecursive) {
|
||||
return fs.create(path, overwritable, bufferSize, replication, blockSize, null);
|
||||
}
|
||||
return fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to create a file with the given parameters and
|
||||
* replication enabled.
|
||||
*/
|
||||
static FSDataOutputStream createHelper(FileSystem fs, Path path, boolean overwritable)
|
||||
throws IOException {
|
||||
if (allMethodsPresent && dfsClass.isInstance(fs)) {
|
||||
try {
|
||||
Object builder;
|
||||
|
||||
builder = createMethod.invoke(fs, path);
|
||||
builder = overwriteMethod.invoke(builder, overwritable);
|
||||
builder = replicateMethod.invoke(builder);
|
||||
return (FSDataOutputStream) buildMethod.invoke(builder);
|
||||
} catch (IllegalAccessException | InvocationTargetException e) {
|
||||
// Should have caught this failure during initialization, so log full trace here
|
||||
LOG.warn("Couldn't use reflection with builder API", e);
|
||||
}
|
||||
}
|
||||
|
||||
return fs.create(path, overwritable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to create a file with the given parameters and
|
||||
* replication enabled.
|
||||
* <p>
|
||||
* <p/>
|
||||
* Will not attempt to enable replication when passed an HFileSystem.
|
||||
*/
|
||||
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable)
|
||||
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite)
|
||||
throws IOException {
|
||||
return DfsBuilderUtility.createHelper(fs, path, overwritable);
|
||||
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite);
|
||||
DfsBuilderUtility.replicate(builder);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to use builder API via reflection to create a file with the given parameters and
|
||||
* replication enabled.
|
||||
* <p>
|
||||
* <p/>
|
||||
* Will not attempt to enable replication when passed an HFileSystem.
|
||||
*/
|
||||
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwritable,
|
||||
public static FSDataOutputStream createForWal(FileSystem fs, Path path, boolean overwrite,
|
||||
int bufferSize, short replication, long blockSize, boolean isRecursive) throws IOException {
|
||||
return DfsBuilderUtility.createHelper(fs, path, overwritable, bufferSize, replication,
|
||||
blockSize, isRecursive);
|
||||
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwrite)
|
||||
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
|
||||
if (isRecursive) {
|
||||
builder.recursive();
|
||||
}
|
||||
|
||||
// Holder singleton idiom. JVM spec ensures this will be run at most once per Classloader, and
|
||||
// not until we attempt to reference it.
|
||||
private static class StreamCapabilities {
|
||||
public static final boolean PRESENT;
|
||||
public static final Class<?> CLASS;
|
||||
public static final Method METHOD;
|
||||
static {
|
||||
boolean tmp = false;
|
||||
Class<?> clazz = null;
|
||||
Method method = null;
|
||||
try {
|
||||
clazz = Class.forName("org.apache.hadoop.fs.StreamCapabilities");
|
||||
method = clazz.getMethod("hasCapability", String.class);
|
||||
tmp = true;
|
||||
} catch(ClassNotFoundException|NoSuchMethodException|SecurityException exception) {
|
||||
LOG.warn("Your Hadoop installation does not include the StreamCapabilities class from " +
|
||||
"HDFS-11644, so we will skip checking if any FSDataOutputStreams actually " +
|
||||
"support hflush/hsync. If you are running on top of HDFS this probably just " +
|
||||
"means you have an older version and this can be ignored. If you are running on " +
|
||||
"top of an alternate FileSystem implementation you should manually verify that " +
|
||||
"hflush and hsync are implemented; otherwise you risk data loss and hard to " +
|
||||
"diagnose errors when our assumptions are violated.");
|
||||
LOG.debug("The first request to check for StreamCapabilities came from this stacktrace.",
|
||||
exception);
|
||||
} finally {
|
||||
PRESENT = tmp;
|
||||
CLASS = clazz;
|
||||
METHOD = method;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If our FileSystem version includes the StreamCapabilities class, check if the given stream has
|
||||
* a particular capability.
|
||||
* @param stream capabilities are per-stream instance, so check this one specifically. must not be
|
||||
* null
|
||||
* @param capability what to look for, per Hadoop Common's FileSystem docs
|
||||
* @return true if there are no StreamCapabilities. false if there are, but this stream doesn't
|
||||
* implement it. return result of asking the stream otherwise.
|
||||
* @throws NullPointerException if {@code stream} is {@code null}
|
||||
*/
|
||||
public static boolean hasCapability(FSDataOutputStream stream, String capability) {
|
||||
// be consistent whether or not StreamCapabilities is present
|
||||
Objects.requireNonNull(stream, "stream cannot be null");
|
||||
// If o.a.h.fs.StreamCapabilities doesn't exist, assume everyone does everything
|
||||
// otherwise old versions of Hadoop will break.
|
||||
boolean result = true;
|
||||
if (StreamCapabilities.PRESENT) {
|
||||
// if StreamCapabilities is present, but the stream doesn't implement it
|
||||
// or we run into a problem invoking the method,
|
||||
// we treat that as equivalent to not declaring anything
|
||||
result = false;
|
||||
if (StreamCapabilities.CLASS.isAssignableFrom(stream.getClass())) {
|
||||
try {
|
||||
result = ((Boolean)StreamCapabilities.METHOD.invoke(stream, capability)).booleanValue();
|
||||
} catch (IllegalAccessException|IllegalArgumentException|InvocationTargetException
|
||||
exception) {
|
||||
LOG.warn("Your Hadoop installation's StreamCapabilities implementation doesn't match " +
|
||||
"our understanding of how it's supposed to work. Please file a JIRA and include " +
|
||||
"the following stack trace. In the mean time we're interpreting this behavior " +
|
||||
"difference as a lack of capability support, which will probably cause a failure.",
|
||||
exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
DfsBuilderUtility.replicate(builder);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,9 +19,8 @@ package org.apache.hadoop.hbase.util;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
@ -35,8 +34,6 @@ import org.junit.Before;
|
|||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Test {@link CommonFSUtils}.
|
||||
|
@ -48,8 +45,6 @@ public class TestCommonFSUtils {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestCommonFSUtils.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestCommonFSUtils.class);
|
||||
|
||||
private HBaseCommonTestingUtility htu;
|
||||
private Configuration conf;
|
||||
|
||||
|
@ -131,38 +126,4 @@ public class TestCommonFSUtils {
|
|||
Path logFile = new Path(CommonFSUtils.getWALRootDir(conf), "test/testlog");
|
||||
assertEquals("test/testlog", CommonFSUtils.removeWALRootPath(logFile, conf));
|
||||
}
|
||||
|
||||
@Test(expected=NullPointerException.class)
|
||||
public void streamCapabilitiesDoesNotAllowNullStream() {
|
||||
CommonFSUtils.hasCapability(null, "hopefully any string");
|
||||
}
|
||||
|
||||
private static final boolean STREAM_CAPABILITIES_IS_PRESENT;
|
||||
static {
|
||||
boolean tmp = false;
|
||||
try {
|
||||
Class.forName("org.apache.hadoop.fs.StreamCapabilities");
|
||||
tmp = true;
|
||||
LOG.debug("Test thought StreamCapabilities class was present.");
|
||||
} catch (ClassNotFoundException exception) {
|
||||
LOG.debug("Test didn't think StreamCapabilities class was present.");
|
||||
} finally {
|
||||
STREAM_CAPABILITIES_IS_PRESENT = tmp;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkStreamCapabilitiesOnKnownNoopStream() throws IOException {
|
||||
FSDataOutputStream stream = new FSDataOutputStream(new ByteArrayOutputStream(), null);
|
||||
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
|
||||
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
|
||||
CommonFSUtils.hasCapability(stream, "hsync"));
|
||||
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
|
||||
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
|
||||
CommonFSUtils.hasCapability(stream, "hflush"));
|
||||
assertNotEquals("We expect our dummy FSDOS to claim capabilities iff the StreamCapabilities " +
|
||||
"class is not defined.", STREAM_CAPABILITIES_IS_PRESENT,
|
||||
CommonFSUtils.hasCapability(stream, "a capability that hopefully no filesystem will " +
|
||||
"implement."));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,11 +35,13 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSError;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
|
@ -1084,8 +1086,8 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
// After we create the stream but before we attempt to use it at all
|
||||
// ensure that we can provide the level of data safety we're configured
|
||||
// to provide.
|
||||
final String durability = useHsync ? "hsync" : "hflush";
|
||||
if (enforceStreamCapability && !(CommonFSUtils.hasCapability(newStream, durability))) {
|
||||
final String durability = useHsync ? StreamCapabilities.HSYNC : StreamCapabilities.HFLUSH;
|
||||
if (enforceStreamCapability && !newStream.hasCapability(durability)) {
|
||||
throw new IllegalStateException("The procedure WAL relies on the ability to " + durability +
|
||||
" for proper operation during component failures, but the underlying filesystem does " +
|
||||
"not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY +
|
||||
|
@ -1151,12 +1153,12 @@ public class WALProcedureStore extends ProcedureStoreBase {
|
|||
log.addToSize(trailerSize);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | FSError e) {
|
||||
LOG.warn("Unable to write the trailer", e);
|
||||
}
|
||||
try {
|
||||
stream.close();
|
||||
} catch (IOException e) {
|
||||
} catch (IOException | FSError e) {
|
||||
LOG.error("Unable to close the stream", e);
|
||||
}
|
||||
stream = null;
|
||||
|
|
|
@ -655,7 +655,7 @@ public class TestWALProcedureStore {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testLogFileAleadExists() throws IOException {
|
||||
public void testLogFileAlreadyExists() throws IOException {
|
||||
final boolean[] tested = {false};
|
||||
WALProcedureStore mStore = Mockito.spy(procStore);
|
||||
|
||||
|
|
|
@ -18,15 +18,17 @@
|
|||
package org.apache.hadoop.hbase.io.asyncfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
|
||||
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
|
||||
|
||||
|
@ -63,11 +65,15 @@ public final class AsyncFSOutputHelper {
|
|||
// After we create the stream but before we attempt to use it at all
|
||||
// ensure that we can provide the level of data safety we're configured
|
||||
// to provide.
|
||||
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true) &&
|
||||
!(CommonFSUtils.hasCapability(out, "hflush") &&
|
||||
CommonFSUtils.hasCapability(out, "hsync"))) {
|
||||
out.close();
|
||||
throw new CommonFSUtils.StreamLacksCapabilityException("hflush and hsync");
|
||||
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
|
||||
if (!out.hasCapability(StreamCapabilities.HFLUSH)) {
|
||||
Closeables.close(out, true);
|
||||
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
|
||||
}
|
||||
if (!out.hasCapability(StreamCapabilities.HSYNC)) {
|
||||
Closeables.close(out, true);
|
||||
throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
|
||||
}
|
||||
}
|
||||
return new WrapperAsyncFSOutput(f, out);
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.OutputStream;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
|
||||
|
@ -90,18 +91,17 @@ public class ProtobufLogWriter extends AbstractProtobufLogWriter
|
|||
return this.output;
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
|
||||
short replication, long blockSize) throws IOException, StreamLacksCapabilityException {
|
||||
this.output = CommonFSUtils.createForWal(fs, path, overwritable, bufferSize, replication,
|
||||
blockSize, false);
|
||||
if (fs.getConf().getBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true)) {
|
||||
if (!CommonFSUtils.hasCapability(output, "hflush")) {
|
||||
throw new StreamLacksCapabilityException("hflush");
|
||||
if (!output.hasCapability(StreamCapabilities.HFLUSH)) {
|
||||
throw new StreamLacksCapabilityException(StreamCapabilities.HFLUSH);
|
||||
}
|
||||
if (!CommonFSUtils.hasCapability(output, "hsync")) {
|
||||
throw new StreamLacksCapabilityException("hsync");
|
||||
if (!output.hasCapability(StreamCapabilities.HSYNC)) {
|
||||
throw new StreamLacksCapabilityException(StreamCapabilities.HSYNC);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -55,8 +55,6 @@ public class TestHBaseWalOnEC {
|
|||
|
||||
private static final HBaseTestingUtility util = new HBaseTestingUtility();
|
||||
|
||||
private static final String HFLUSH = "hflush";
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
try {
|
||||
|
@ -75,7 +73,7 @@ public class TestHBaseWalOnEC {
|
|||
try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
|
||||
// If this comes back as having hflush then some test setup assumption is wrong.
|
||||
// Fail the test so that a developer has to look and triage
|
||||
assertFalse("Did not enable EC!", CommonFSUtils.hasCapability(out, HFLUSH));
|
||||
assertFalse("Did not enable EC!", out.hasCapability(StreamCapabilities.HFLUSH));
|
||||
}
|
||||
} catch (NoSuchMethodException e) {
|
||||
// We're not testing anything interesting if EC is not available, so skip the rest of the test
|
||||
|
@ -95,7 +93,7 @@ public class TestHBaseWalOnEC {
|
|||
public void testStreamCreate() throws IOException {
|
||||
try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
|
||||
new Path("/testStreamCreate"), true)) {
|
||||
assertTrue(CommonFSUtils.hasCapability(out, HFLUSH));
|
||||
assertTrue(out.hasCapability(StreamCapabilities.HFLUSH));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StreamCapabilities;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -661,15 +662,11 @@ public class TestFSUtils {
|
|||
MiniDFSCluster cluster = htu.startMiniDFSCluster(1);
|
||||
try (FileSystem filesystem = cluster.getFileSystem()) {
|
||||
FSDataOutputStream stream = filesystem.create(new Path("/tmp/foobar"));
|
||||
assertTrue(FSUtils.hasCapability(stream, "hsync"));
|
||||
assertTrue(FSUtils.hasCapability(stream, "hflush"));
|
||||
assertNotEquals("We expect HdfsDataOutputStream to say it has a dummy capability iff the " +
|
||||
"StreamCapabilities class is not defined.",
|
||||
STREAM_CAPABILITIES_IS_PRESENT,
|
||||
FSUtils.hasCapability(stream, "a capability that hopefully HDFS doesn't add."));
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
|
||||
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
|
||||
assertFalse(stream.hasCapability("a capability that hopefully HDFS doesn't add."));
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue