HBASE-17538 HDFS.setStoragePolicy() logs errors on local fs Policy is set in a number of places each with its own 'implementation'.
A setStoragePolicy was added by: commit629b04f44f
Author: Yu Li <liyu@apache.org> Date: Fri Jan 6 18:35:38 2017 +0800 HBASE-15172 Support setting storage policy in bulkload ..for hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java And in *FileSystem files and StoreFileWriter by commitf92a14ade6
Author: Yu Li <liyu@apache.org> Date: Mon Jan 9 09:52:58 2017 +0800 HBASE-14061 Support CF-level Storage Policy This patch has all instances call the FSUtils#setStoragePolicy added here: commiteafc07a06d
Author: tedyu <yuzhihong@gmail.com> Date: Thu Jan 15 08:52:30 2015 -0800 HBASE-12848 Utilize Flash storage for WAL It does right thing when local vs distributed calling setStoragePolicy. Fixed bug in the above FSUtils#setStoragePolicy where the check for a distributed filesystem was failing when passed an HFileSystem -- though it was backed by a DistributedFileSystem. Cleanups.
This commit is contained in:
parent
da5722cb48
commit
c725d4d334
|
@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FilterFileSystem;
|
|||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
|
@ -148,20 +149,14 @@ public class HFileSystem extends FilterFileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the source path (directory/file) to the specified storage policy. <br>
|
||||
* <i>"LAZY_PERSIST"</i>, <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>,
|
||||
* <i>"COLD"</i> <br>
|
||||
* <br>
|
||||
* See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
|
||||
* Set the source path (directory/file) to the specified storage policy.
|
||||
* @param path The source path (directory/file).
|
||||
* @param policyName The name of the storage policy.
|
||||
* @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
|
||||
* See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
|
||||
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*/
|
||||
public void setStoragePolicy(Path path, String policyName) {
|
||||
try {
|
||||
ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", path, policyName);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to set storage policy of [" + path + "] to [" + policyName + "]", e);
|
||||
}
|
||||
FSUtils.setStoragePolicy(this.fs, path, policyName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,8 +32,6 @@ import java.util.UUID;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -47,28 +45,30 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.io.NullWritable;
|
||||
import org.apache.hadoop.io.SequenceFile;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -400,15 +400,7 @@ public class HFileOutputFormat2
|
|||
String policy =
|
||||
conf.get(STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes.toString(family),
|
||||
conf.get(STORAGE_POLICY_PROPERTY));
|
||||
if (null != policy && !policy.trim().isEmpty()) {
|
||||
try {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
((DistributedFileSystem) fs).setStoragePolicy(cfPath, policy.trim());
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("failed to set block storage policy of [" + cfPath + "] to [" + policy + "]", e);
|
||||
}
|
||||
}
|
||||
FSUtils.setStoragePolicy(fs, cfPath, policy);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -22,8 +22,6 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -38,7 +36,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
@ -56,7 +53,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
@ -190,17 +186,12 @@ public class HRegionFileSystem {
|
|||
* <br>
|
||||
* See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
|
||||
* @param familyName The name of column family.
|
||||
* @param policyName The name of the storage policy.
|
||||
* @param policyName The name of the storage policy: 'HOT', 'COLD', etc.
|
||||
* See see hadoop 2.6+ org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
|
||||
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*/
|
||||
public void setStoragePolicy(String familyName, String policyName) {
|
||||
Path storeDir = getStoreDir(familyName);
|
||||
try {
|
||||
ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", storeDir, policyName);
|
||||
} catch (Exception e) {
|
||||
if (!(this.fs instanceof LocalFileSystem)) {
|
||||
LOG.warn("Failed to set storage policy of [" + storeDir + "] to [" + policyName + "]", e);
|
||||
}
|
||||
}
|
||||
FSUtils.setStoragePolicy(this.fs, getStoreDir(familyName), policyName);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -228,14 +228,7 @@ public class HStore implements Store {
|
|||
if (null == policyName) {
|
||||
policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY, DEFAULT_BLOCK_STORAGE_POLICY);
|
||||
}
|
||||
if (null != policyName && !policyName.trim().isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("set block storage policy of [" + family.getNameAsString() + "] to ["
|
||||
+ policyName + "]");
|
||||
}
|
||||
|
||||
this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
|
||||
}
|
||||
|
||||
this.dataBlockEncoder =
|
||||
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.BloomContext;
|
|||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterWriter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.util.RowBloomContext;
|
||||
import org.apache.hadoop.hbase.util.RowColBloomContext;
|
||||
|
@ -476,17 +477,7 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
if (null == policyName) {
|
||||
policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
|
||||
}
|
||||
if (null != policyName && !policyName.trim().isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("set block storage policy of [" + dir + "] to [" + policyName + "]");
|
||||
}
|
||||
|
||||
try {
|
||||
ReflectionUtils.invokeMethod(this.fs, "setStoragePolicy", dir, policyName.trim());
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to set storage policy of [" + dir + "] to [" + policyName + "]", e);
|
||||
}
|
||||
}
|
||||
FSUtils.setStoragePolicy(this.fs, dir, policyName);
|
||||
|
||||
if (filePath == null) {
|
||||
filePath = StoreFile.getUniqueFile(fs, dir);
|
||||
|
|
|
@ -131,7 +131,8 @@ public abstract class FSUtils {
|
|||
* @param fs We only do anything if an instance of DistributedFileSystem
|
||||
* @param conf used to look up storage policy with given key; not modified.
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param policyKey e.g. HConstants.WAL_STORAGE_POLICY
|
||||
* @param policyKey Key to use pulling a policy from Configuration:
|
||||
* e.g. HConstants.WAL_STORAGE_POLICY (hbase.wal.storage.policy).
|
||||
* @param defaultPolicy usually should be the policy NONE to delegate to HDFS
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Configuration conf,
|
||||
|
@ -143,27 +144,80 @@ public abstract class FSUtils {
|
|||
}
|
||||
return;
|
||||
}
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem dfs = (DistributedFileSystem)fs;
|
||||
// Once our minimum supported Hadoop version is 2.6.0 we can remove reflection.
|
||||
Class<? extends DistributedFileSystem> dfsClass = dfs.getClass();
|
||||
setStoragePolicy(fs, path, storagePolicy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets storage policy for given path.
|
||||
* If the passed path is a directory, we'll set the storage policy for all files
|
||||
* created in the future in said directory. Note that this change in storage
|
||||
* policy takes place at the HDFS level; it will persist beyond this RS's lifecycle.
|
||||
* If we're running on a version of HDFS that doesn't support the given storage policy
|
||||
* (or storage policies at all), then we'll issue a log message and continue.
|
||||
*
|
||||
* See http://hadoop.apache.org/docs/r2.6.0/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
|
||||
*
|
||||
* @param fs We only do anything if an instance of DistributedFileSystem
|
||||
* @param path the Path whose storage policy is to be set
|
||||
* @param storagePolicy Policy to set on <code>path</code>; see hadoop 2.6+
|
||||
* org.apache.hadoop.hdfs.protocol.HdfsConstants for possible list e.g
|
||||
* 'COLD', 'WARM', 'HOT', 'ONE_SSD', 'ALL_SSD', 'LAZY_PERSIST'.
|
||||
*/
|
||||
public static void setStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
if (storagePolicy == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed a null storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
final String trimmedStoragePolicy = storagePolicy.trim();
|
||||
if (trimmedStoragePolicy.isEmpty()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("We were passed an empty storagePolicy, exiting early.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
boolean distributed = false;
|
||||
try {
|
||||
distributed = isDistributedFileSystem(fs);
|
||||
} catch (IOException ioe) {
|
||||
// This should NEVER happen.
|
||||
LOG.warn("Failed setStoragePolicy=" + trimmedStoragePolicy + " on path=" +
|
||||
path + "; failed isDFS test", ioe);
|
||||
return;
|
||||
}
|
||||
if (distributed) {
|
||||
invokeSetStoragePolicy(fs, path, trimmedStoragePolicy);
|
||||
} else {
|
||||
LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
|
||||
"support setStoragePolicy. Unable to set storagePolicy=" + trimmedStoragePolicy +
|
||||
" on path=" + path);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* All args have been checked and are good. Run the setStoragePolicy invocation.
|
||||
*/
|
||||
private static void invokeSetStoragePolicy(final FileSystem fs, final Path path,
|
||||
final String storagePolicy) {
|
||||
Method m = null;
|
||||
try {
|
||||
m = dfsClass.getDeclaredMethod("setStoragePolicy",
|
||||
m = fs.getClass().getDeclaredMethod("setStoragePolicy",
|
||||
new Class<?>[] { Path.class, String.class });
|
||||
m.setAccessible(true);
|
||||
} catch (NoSuchMethodException e) {
|
||||
LOG.info("FileSystem doesn't support"
|
||||
+ " setStoragePolicy; --HDFS-6584 not available");
|
||||
LOG.info("FileSystem doesn't support setStoragePolicy; HDFS-6584 not available "
|
||||
+ "(hadoop-2.6.0+): " + e.getMessage());
|
||||
} catch (SecurityException e) {
|
||||
LOG.info("Doesn't have access to setStoragePolicy on "
|
||||
+ "FileSystems --HDFS-6584 not available", e);
|
||||
LOG.info("Don't have access to setStoragePolicy on FileSystems; HDFS-6584 not available "
|
||||
+ "(hadoop-2.6.0+): ", e);
|
||||
m = null; // could happen on setAccessible()
|
||||
}
|
||||
if (m != null) {
|
||||
try {
|
||||
m.invoke(dfs, path, storagePolicy);
|
||||
LOG.info("set " + storagePolicy + " for " + path);
|
||||
m.invoke(fs, path, storagePolicy);
|
||||
LOG.info("Set storagePolicy=" + storagePolicy + " for path=" + path);
|
||||
} catch (Exception e) {
|
||||
// check for lack of HDFS-7228
|
||||
boolean probablyBadPolicy = false;
|
||||
|
@ -183,14 +237,24 @@ public abstract class FSUtils {
|
|||
if (!probablyBadPolicy) {
|
||||
// This swallows FNFE, should we be throwing it? seems more likely to indicate dev
|
||||
// misuse than a runtime problem with HDFS.
|
||||
LOG.warn("Unable to set " + storagePolicy + " for " + path, e);
|
||||
LOG.warn("Unable to set storagePolicy=" + storagePolicy + " for path=" + path, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG.info("FileSystem isn't an instance of DistributedFileSystem; presuming it doesn't " +
|
||||
"support setStoragePolicy.");
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True is <code>fs</code> is instance of DistributedFileSystem
|
||||
* @throws IOException
|
||||
*/
|
||||
private static boolean isDistributedFileSystem(final FileSystem fs) throws IOException {
|
||||
FileSystem fileSystem = fs;
|
||||
// If passed an instance of HFileSystem, it fails instanceof DistributedFileSystem.
|
||||
// Check its backing fs for dfs-ness.
|
||||
if (fs instanceof HFileSystem) {
|
||||
fileSystem = ((HFileSystem)fs).getBackingFs();
|
||||
}
|
||||
return fileSystem instanceof DistributedFileSystem;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in New Issue