HBASE-14061 Support CF-level Storage Policy
This commit is contained in:
parent
6fecf55a7e
commit
f92a14ade6
|
@ -18,8 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -34,13 +32,15 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.exceptions.HBaseException;
|
import org.apache.hadoop.hbase.exceptions.HBaseException;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilySchema;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ColumnFamilySchema;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.PrettyPrinter;
|
import org.apache.hadoop.hbase.util.PrettyPrinter;
|
||||||
import org.apache.hadoop.hbase.util.PrettyPrinter.Unit;
|
import org.apache.hadoop.hbase.util.PrettyPrinter.Unit;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An HColumnDescriptor contains information about a column family such as the
|
* An HColumnDescriptor contains information about a column family such as the
|
||||||
|
@ -160,6 +160,8 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
|
||||||
public static final String DFS_REPLICATION = "DFS_REPLICATION";
|
public static final String DFS_REPLICATION = "DFS_REPLICATION";
|
||||||
public static final short DEFAULT_DFS_REPLICATION = 0;
|
public static final short DEFAULT_DFS_REPLICATION = 0;
|
||||||
|
|
||||||
|
public static final String STORAGE_POLICY = "STORAGE_POLICY";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default compression type.
|
* Default compression type.
|
||||||
*/
|
*/
|
||||||
|
@ -1273,4 +1275,24 @@ public class HColumnDescriptor implements Comparable<HColumnDescriptor> {
|
||||||
setValue(DFS_REPLICATION, Short.toString(replication));
|
setValue(DFS_REPLICATION, Short.toString(replication));
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the storage policy in use by this family
|
||||||
|
* <p/>
|
||||||
|
* Not using {@code enum} here because HDFS is not using {@code enum} for storage policy, see
|
||||||
|
* org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite for more details
|
||||||
|
*/
|
||||||
|
public String getStoragePolicy() {
|
||||||
|
return getValue(STORAGE_POLICY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the storage policy for use with this family
|
||||||
|
* @param policy the policy to set, valid setting includes: <i>"LAZY_PERSIST"</i>,
|
||||||
|
* <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>, <i>"COLD"</i>
|
||||||
|
*/
|
||||||
|
public HColumnDescriptor setStoragePolicy(String policy) {
|
||||||
|
setValue(STORAGE_POLICY, policy);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,14 +43,19 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An encapsulation for the FileSystem object that hbase uses to access
|
* An encapsulation for the FileSystem object that hbase uses to access
|
||||||
* data. This class allows the flexibility of using
|
* data. This class allows the flexibility of using
|
||||||
|
@ -141,6 +146,56 @@ public class HFileSystem extends FilterFileSystem {
|
||||||
return fs;
|
return fs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the source path (directory/file) to the specified storage policy. <br>
|
||||||
|
* <i>"LAZY_PERSIST"</i>, <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>,
|
||||||
|
* <i>"COLD"</i> <br>
|
||||||
|
* <br>
|
||||||
|
* See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
|
||||||
|
* @param path The source path (directory/file).
|
||||||
|
* @param policyName The name of the storage policy.
|
||||||
|
*/
|
||||||
|
public void setStoragePolicy(Path path, String policyName) {
|
||||||
|
try {
|
||||||
|
if (this.fs instanceof DistributedFileSystem) {
|
||||||
|
((DistributedFileSystem) this.fs).setStoragePolicy(path, policyName);
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("failed to set block storage policy of [" + path + "] to [" + policyName + "]", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the storage policy of the source path (directory/file).
|
||||||
|
* @param path The source path (directory/file).
|
||||||
|
* @return Storage policy name, or {@code null} if not using {@link DistributedFileSystem} or
|
||||||
|
* exception thrown when trying to get policy
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public String getStoragePolicy(Path path) {
|
||||||
|
try {
|
||||||
|
if (this.fs instanceof DistributedFileSystem) {
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem) this.fs;
|
||||||
|
HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
|
||||||
|
if (null != status) {
|
||||||
|
byte storagePolicyId = status.getStoragePolicy();
|
||||||
|
if (storagePolicyId != BlockStoragePolicySuite.ID_UNSPECIFIED) {
|
||||||
|
BlockStoragePolicy[] policies = dfs.getStoragePolicies();
|
||||||
|
for (BlockStoragePolicy policy : policies) {
|
||||||
|
if (policy.getId() == storagePolicyId) {
|
||||||
|
return policy.getName();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
LOG.warn("failed to get block storage policy of [" + path + "]", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Are we verifying checksums in HBase?
|
* Are we verifying checksums in HBase?
|
||||||
* @return True, if hbase is configured to verify checksums,
|
* @return True, if hbase is configured to verify checksums,
|
||||||
|
|
|
@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import edu.umd.cs.findbugs.annotations.Nullable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* View to an on-disk Region.
|
* View to an on-disk Region.
|
||||||
* Provides the set of methods necessary to interact with the on-disk region data.
|
* Provides the set of methods necessary to interact with the on-disk region data.
|
||||||
|
@ -177,6 +179,38 @@ public class HRegionFileSystem {
|
||||||
return storeDir;
|
return storeDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the directory of CF to the specified storage policy. <br>
|
||||||
|
* <i>"LAZY_PERSIST"</i>, <i>"ALL_SSD"</i>, <i>"ONE_SSD"</i>, <i>"HOT"</i>, <i>"WARM"</i>,
|
||||||
|
* <i>"COLD"</i> <br>
|
||||||
|
* <br>
|
||||||
|
* See {@link org.apache.hadoop.hdfs.protocol.HdfsConstants} for more details.
|
||||||
|
* @param familyName The name of column family.
|
||||||
|
* @param policyName The name of the storage policy.
|
||||||
|
*/
|
||||||
|
public void setStoragePolicy(String familyName, String policyName) {
|
||||||
|
if (this.fs instanceof HFileSystem) {
|
||||||
|
Path storeDir = getStoreDir(familyName);
|
||||||
|
((HFileSystem) this.fs).setStoragePolicy(storeDir, policyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the storage policy of the directory of CF.
|
||||||
|
* @param familyName The name of column family.
|
||||||
|
* @return Storage policy name, or {@code null} if not using {@link HFileSystem} or exception
|
||||||
|
* thrown when trying to get policy
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public String getStoragePolicy(String familyName) {
|
||||||
|
if (this.fs instanceof HFileSystem) {
|
||||||
|
Path storeDir = getStoreDir(familyName);
|
||||||
|
return ((HFileSystem) this.fs).getStoragePolicy(storeDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the store files available for the family.
|
* Returns the store files available for the family.
|
||||||
* This methods performs the filtering based on the valid store files.
|
* This methods performs the filtering based on the valid store files.
|
||||||
|
|
|
@ -116,6 +116,7 @@ public class HStore implements Store {
|
||||||
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
|
public static final String COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY =
|
||||||
"hbase.server.compactchecker.interval.multiplier";
|
"hbase.server.compactchecker.interval.multiplier";
|
||||||
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
|
public static final String BLOCKING_STOREFILES_KEY = "hbase.hstore.blockingStoreFiles";
|
||||||
|
public static final String BLOCK_STORAGE_POLICY_KEY = "hbase.hstore.block.storage.policy";
|
||||||
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
|
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
|
||||||
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
|
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 7;
|
||||||
|
|
||||||
|
@ -227,6 +228,20 @@ public class HStore implements Store {
|
||||||
.addBytesMap(family.getValues());
|
.addBytesMap(family.getValues());
|
||||||
this.blocksize = family.getBlocksize();
|
this.blocksize = family.getBlocksize();
|
||||||
|
|
||||||
|
// set block storage policy for store directory
|
||||||
|
String policyName = family.getStoragePolicy();
|
||||||
|
if (null == policyName) {
|
||||||
|
policyName = this.conf.get(BLOCK_STORAGE_POLICY_KEY);
|
||||||
|
}
|
||||||
|
if (null != policyName && !policyName.trim().isEmpty()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("set block storage policy of [" + family.getNameAsString() + "] to ["
|
||||||
|
+ policyName + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.fs.setStoragePolicy(family.getNameAsString(), policyName.trim());
|
||||||
|
}
|
||||||
|
|
||||||
this.dataBlockEncoder =
|
this.dataBlockEncoder =
|
||||||
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
|
new HFileDataBlockEncoderImpl(family.getDataBlockEncoding());
|
||||||
|
|
||||||
|
@ -1052,9 +1067,10 @@ public class HStore implements Store {
|
||||||
}
|
}
|
||||||
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
|
HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
|
||||||
cryptoContext);
|
cryptoContext);
|
||||||
|
Path familyTempDir = new Path(fs.getTempDir(), family.getNameAsString());
|
||||||
StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
|
StoreFileWriter.Builder builder = new StoreFileWriter.Builder(conf, writerCacheConf,
|
||||||
this.getFileSystem())
|
this.getFileSystem())
|
||||||
.withFilePath(fs.createTempName())
|
.withOutputDir(familyTempDir)
|
||||||
.withComparator(comparator)
|
.withComparator(comparator)
|
||||||
.withBloomType(family.getBloomFilterType())
|
.withBloomType(family.getBloomFilterType())
|
||||||
.withMaxKeyCount(maxKeyCount)
|
.withMaxKeyCount(maxKeyCount)
|
||||||
|
|
|
@ -29,9 +29,11 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||||
|
@ -468,6 +470,21 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||||
fs.mkdirs(dir);
|
fs.mkdirs(dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set block storage policy for temp path
|
||||||
|
String policyName = this.conf.get(HColumnDescriptor.STORAGE_POLICY);
|
||||||
|
if (null == policyName) {
|
||||||
|
policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
|
||||||
|
}
|
||||||
|
if (null != policyName && !policyName.trim().isEmpty()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("set block storage policy of [" + dir + "] to [" + policyName + "]");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.fs instanceof HFileSystem) {
|
||||||
|
((HFileSystem) this.fs).setStoragePolicy(dir, policyName.trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (filePath == null) {
|
if (filePath == null) {
|
||||||
filePath = StoreFile.getUniqueFile(fs, dir);
|
filePath = StoreFile.getUniqueFile(fs, dir);
|
||||||
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
|
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
|
||||||
|
|
|
@ -187,9 +187,13 @@ public class TestCompaction {
|
||||||
assertEquals(compactionThreshold, s.getStorefilesCount());
|
assertEquals(compactionThreshold, s.getStorefilesCount());
|
||||||
assertTrue(s.getStorefilesSize() > 15*1000);
|
assertTrue(s.getStorefilesSize() > 15*1000);
|
||||||
// and no new store files persisted past compactStores()
|
// and no new store files persisted past compactStores()
|
||||||
|
// only one empty dir exists in temp dir
|
||||||
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
|
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
|
||||||
|
assertEquals(1, ls.length);
|
||||||
|
Path storeTempDir = new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
|
||||||
|
assertTrue(r.getFilesystem().exists(storeTempDir));
|
||||||
|
ls = r.getFilesystem().listStatus(storeTempDir);
|
||||||
assertEquals(0, ls.length);
|
assertEquals(0, ls.length);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
// don't mess up future tests
|
// don't mess up future tests
|
||||||
r.writestate.writesEnabled = true;
|
r.writestate.writesEnabled = true;
|
||||||
|
|
|
@ -912,7 +912,7 @@ public class TestHRegion {
|
||||||
assertEquals(3, region.getStore(family).getStorefilesCount());
|
assertEquals(3, region.getStore(family).getStorefilesCount());
|
||||||
|
|
||||||
// now find the compacted file, and manually add it to the recovered edits
|
// now find the compacted file, and manually add it to the recovered edits
|
||||||
Path tmpDir = region.getRegionFileSystem().getTempDir();
|
Path tmpDir = new Path(region.getRegionFileSystem().getTempDir(), Bytes.toString(family));
|
||||||
FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
|
FileStatus[] files = FSUtils.listStatus(fs, tmpDir);
|
||||||
String errorMsg = "Expected to find 1 file in the region temp directory "
|
String errorMsg = "Expected to find 1 file in the region temp directory "
|
||||||
+ "from the compaction, could not find any";
|
+ "from the compaction, could not find any";
|
||||||
|
|
|
@ -21,11 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -36,14 +39,20 @@ import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.PerformanceEvaluation;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@ -51,6 +60,126 @@ import org.junit.experimental.categories.Category;
|
||||||
public class TestHRegionFileSystem {
|
public class TestHRegionFileSystem {
|
||||||
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
|
private static final Log LOG = LogFactory.getLog(TestHRegionFileSystem.class);
|
||||||
|
private static final byte[][] FAMILIES = {
|
||||||
|
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-A")),
|
||||||
|
Bytes.add(PerformanceEvaluation.FAMILY_NAME, Bytes.toBytes("-B")) };
|
||||||
|
private static final TableName TABLE_NAME = TableName.valueOf("TestTable");
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockStoragePolicy() throws Exception {
|
||||||
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
HTable table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
|
||||||
|
assertEquals("Should start with empty table", 0, TEST_UTIL.countRows(table));
|
||||||
|
HRegionFileSystem regionFs = getHRegionFS(table, conf);
|
||||||
|
// the original block storage policy would be NULL
|
||||||
|
String spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
|
||||||
|
String spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
|
||||||
|
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||||
|
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||||
|
assertNull(spA);
|
||||||
|
assertNull(spB);
|
||||||
|
|
||||||
|
// Recreate table and make sure storage policy could be set through configuration
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
TEST_UTIL.getConfiguration().set(HStore.BLOCK_STORAGE_POLICY_KEY, "HOT");
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
table = (HTable) TEST_UTIL.createTable(TABLE_NAME, FAMILIES);
|
||||||
|
regionFs = getHRegionFS(table, conf);
|
||||||
|
|
||||||
|
try (Admin admin = TEST_UTIL.getConnection().getAdmin()) {
|
||||||
|
spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
|
||||||
|
spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
|
||||||
|
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||||
|
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||||
|
assertEquals("HOT", spA);
|
||||||
|
assertEquals("HOT", spB);
|
||||||
|
|
||||||
|
// alter table cf schema to change storage policies
|
||||||
|
// and make sure it could override settings in conf
|
||||||
|
HColumnDescriptor hcdA = new HColumnDescriptor(Bytes.toString(FAMILIES[0]));
|
||||||
|
// alter through setting HStore#BLOCK_STORAGE_POLICY_KEY in HColumnDescriptor
|
||||||
|
hcdA.setValue(HStore.BLOCK_STORAGE_POLICY_KEY, "ONE_SSD");
|
||||||
|
admin.modifyColumnFamily(TABLE_NAME, hcdA);
|
||||||
|
while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||||
|
.isRegionsInTransition()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
LOG.debug("Waiting on table to finish schema altering");
|
||||||
|
}
|
||||||
|
// alter through HColumnDescriptor#setStoragePolicy
|
||||||
|
HColumnDescriptor hcdB = new HColumnDescriptor(Bytes.toString(FAMILIES[1]));
|
||||||
|
hcdB.setStoragePolicy("ALL_SSD");
|
||||||
|
admin.modifyColumnFamily(TABLE_NAME, hcdB);
|
||||||
|
while (TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
|
||||||
|
.isRegionsInTransition()) {
|
||||||
|
Thread.sleep(200);
|
||||||
|
LOG.debug("Waiting on table to finish schema altering");
|
||||||
|
}
|
||||||
|
spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
|
||||||
|
spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
|
||||||
|
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||||
|
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||||
|
assertNotNull(spA);
|
||||||
|
assertEquals("ONE_SSD", spA);
|
||||||
|
assertNotNull(spB);
|
||||||
|
assertEquals("ALL_SSD", spB);
|
||||||
|
|
||||||
|
// flush memstore snapshot into 3 files
|
||||||
|
for (long i = 0; i < 3; i++) {
|
||||||
|
Put put = new Put(Bytes.toBytes(i));
|
||||||
|
put.addColumn(FAMILIES[0], Bytes.toBytes(i), Bytes.toBytes(i));
|
||||||
|
table.put(put);
|
||||||
|
admin.flush(TABLE_NAME);
|
||||||
|
}
|
||||||
|
// there should be 3 files in store dir
|
||||||
|
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
Path storePath = regionFs.getStoreDir(Bytes.toString(FAMILIES[0]));
|
||||||
|
FileStatus[] storeFiles = FSUtils.listStatus(fs, storePath);
|
||||||
|
assertNotNull(storeFiles);
|
||||||
|
assertEquals(3, storeFiles.length);
|
||||||
|
// store temp dir still exists but empty
|
||||||
|
Path storeTempDir = new Path(regionFs.getTempDir(), Bytes.toString(FAMILIES[0]));
|
||||||
|
assertTrue(fs.exists(storeTempDir));
|
||||||
|
FileStatus[] tempFiles = FSUtils.listStatus(fs, storeTempDir);
|
||||||
|
assertNull(tempFiles);
|
||||||
|
// storage policy of cf temp dir and 3 store files should be ONE_SSD
|
||||||
|
assertEquals("ONE_SSD",
|
||||||
|
((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(storeTempDir));
|
||||||
|
for (FileStatus status : storeFiles) {
|
||||||
|
assertEquals("ONE_SSD",
|
||||||
|
((HFileSystem) regionFs.getFileSystem()).getStoragePolicy(status.getPath()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// change storage policies by calling raw api directly
|
||||||
|
regionFs.setStoragePolicy(Bytes.toString(FAMILIES[0]), "ALL_SSD");
|
||||||
|
regionFs.setStoragePolicy(Bytes.toString(FAMILIES[1]), "ONE_SSD");
|
||||||
|
spA = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[0]));
|
||||||
|
spB = regionFs.getStoragePolicy(Bytes.toString(FAMILIES[1]));
|
||||||
|
LOG.debug("Storage policy of cf 0: [" + spA + "].");
|
||||||
|
LOG.debug("Storage policy of cf 1: [" + spB + "].");
|
||||||
|
assertNotNull(spA);
|
||||||
|
assertEquals("ALL_SSD", spA);
|
||||||
|
assertNotNull(spB);
|
||||||
|
assertEquals("ONE_SSD", spB);
|
||||||
|
} finally {
|
||||||
|
table.close();
|
||||||
|
TEST_UTIL.deleteTable(TABLE_NAME);
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private HRegionFileSystem getHRegionFS(HTable table, Configuration conf) throws IOException {
|
||||||
|
FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
Path tableDir = FSUtils.getTableDir(TEST_UTIL.getDefaultRootDirPath(), table.getName());
|
||||||
|
List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
|
||||||
|
assertEquals(1, regionDirs.size());
|
||||||
|
List<Path> familyDirs = FSUtils.getFamilyDirs(fs, regionDirs.get(0));
|
||||||
|
assertEquals(2, familyDirs.size());
|
||||||
|
HRegionInfo hri = table.getRegionLocator().getAllRegionLocations().get(0).getRegionInfo();
|
||||||
|
HRegionFileSystem regionFs = new HRegionFileSystem(conf, new HFileSystem(fs), tableDir, hri);
|
||||||
|
return regionFs;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOnDiskRegionCreation() throws IOException {
|
public void testOnDiskRegionCreation() throws IOException {
|
||||||
|
|
|
@ -854,6 +854,10 @@ module Hbase
|
||||||
algorithm))
|
algorithm))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY)
|
||||||
|
storage_policy = arg.delete(org.apache.hadoop.hbase.HColumnDescriptor::STORAGE_POLICY).upcase
|
||||||
|
family.setStoragePolicy(storage_policy)
|
||||||
|
end
|
||||||
|
|
||||||
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
|
set_user_metadata(family, arg.delete(METADATA)) if arg[METADATA]
|
||||||
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
|
set_descriptor_config(family, arg.delete(CONFIGURATION)) if arg[CONFIGURATION]
|
||||||
|
|
Loading…
Reference in New Issue