HBASE-13720 - Mob files are not encrypting in mob compaction and Sweeper
(Jingcheng du)
This commit is contained in:
parent
132f65ea1f
commit
5428c9fdd3
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.mob;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.security.Key;
|
||||
import java.security.KeyException;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
|
@ -41,11 +43,21 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.crypto.Cipher;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
|
@ -57,7 +69,13 @@ import org.apache.hadoop.hbase.mob.filecompactions.PartitionedMobFileCompactor;
|
|||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.util.*;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
/**
|
||||
* The mob utilities
|
||||
|
@ -439,17 +457,19 @@ public class MobUtils {
|
|||
* @param compression The compression algorithm.
|
||||
* @param startKey The hex string of the start key.
|
||||
* @param cacheConfig The current cache config.
|
||||
* @param cryptoContext The encryption context.
|
||||
* @return The writer for the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
|
||||
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
|
||||
Compression.Algorithm compression, String startKey, CacheConfig cacheConfig)
|
||||
Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
|
||||
Encryption.Context cryptoContext)
|
||||
throws IOException {
|
||||
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
|
||||
.replaceAll("-", ""));
|
||||
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
|
||||
cacheConfig);
|
||||
cacheConfig, cryptoContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -460,17 +480,20 @@ public class MobUtils {
|
|||
* @param basePath The basic path for a temp directory.
|
||||
* @param maxKeyCount The key count.
|
||||
* @param cacheConfig The current cache config.
|
||||
* @param cryptoContext The encryption context.
|
||||
* @return The writer for the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static StoreFile.Writer createRefFileWriter(Configuration conf, FileSystem fs,
|
||||
HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig)
|
||||
HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
|
||||
Encryption.Context cryptoContext)
|
||||
throws IOException {
|
||||
HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
|
||||
.withIncludesTags(true).withCompression(family.getCompactionCompression())
|
||||
.withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf))
|
||||
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
|
||||
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
|
||||
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
|
||||
.withEncryptionContext(cryptoContext).build();
|
||||
Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
|
||||
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs).withFilePath(tempPath)
|
||||
.withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType())
|
||||
|
@ -489,17 +512,19 @@ public class MobUtils {
|
|||
* @param compression The compression algorithm.
|
||||
* @param startKey The start key.
|
||||
* @param cacheConfig The current cache config.
|
||||
* @param cryptoContext The encryption context.
|
||||
* @return The writer for the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
|
||||
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
|
||||
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
|
||||
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
|
||||
Encryption.Context cryptoContext)
|
||||
throws IOException {
|
||||
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString()
|
||||
.replaceAll("-", ""));
|
||||
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
|
||||
cacheConfig);
|
||||
cacheConfig, cryptoContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -513,18 +538,20 @@ public class MobUtils {
|
|||
* @param compression The compression algorithm.
|
||||
* @param startKey The start key.
|
||||
* @param cacheConfig The current cache config.
|
||||
* @param cryptoContext The encryption context.
|
||||
* @return The writer for the del file.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static StoreFile.Writer createDelFileWriter(Configuration conf, FileSystem fs,
|
||||
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
|
||||
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig)
|
||||
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
|
||||
Encryption.Context cryptoContext)
|
||||
throws IOException {
|
||||
String suffix = UUID
|
||||
.randomUUID().toString().replaceAll("-", "") + "_del";
|
||||
MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
|
||||
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
|
||||
cacheConfig);
|
||||
cacheConfig, cryptoContext);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -537,17 +564,20 @@ public class MobUtils {
|
|||
* @param maxKeyCount The key count.
|
||||
* @param compression The compression algorithm.
|
||||
* @param cacheConfig The current cache config.
|
||||
* @param cryptoContext The encryption context.
|
||||
* @return The writer for the mob file.
|
||||
* @throws IOException
|
||||
*/
|
||||
private static StoreFile.Writer createWriter(Configuration conf, FileSystem fs,
|
||||
HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
|
||||
Compression.Algorithm compression, CacheConfig cacheConfig) throws IOException {
|
||||
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext)
|
||||
throws IOException {
|
||||
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
|
||||
.withIncludesMvcc(true).withIncludesTags(true)
|
||||
.withChecksumType(ChecksumType.getDefaultChecksumType())
|
||||
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM).withBlockSize(family.getBlocksize())
|
||||
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()).build();
|
||||
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
|
||||
.withEncryptionContext(cryptoContext).build();
|
||||
|
||||
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, cacheConfig, fs)
|
||||
.withFilePath(new Path(basePath, mobFileName.getFileName()))
|
||||
|
@ -739,4 +769,72 @@ public class MobUtils {
|
|||
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
|
||||
return pool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the encyption context.
|
||||
* @param conf The current configuration.
|
||||
* @param family The current column descriptor.
|
||||
* @return The encryption context.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static Encryption.Context createEncryptionContext(Configuration conf,
|
||||
HColumnDescriptor family) throws IOException {
|
||||
// TODO the code is repeated, and needs to be unified.
|
||||
Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
String cipherName = family.getEncryptionType();
|
||||
if (cipherName != null) {
|
||||
Cipher cipher;
|
||||
Key key;
|
||||
byte[] keyBytes = family.getEncryptionKey();
|
||||
if (keyBytes != null) {
|
||||
// Family provides specific key material
|
||||
String masterKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User
|
||||
.getCurrent().getShortName());
|
||||
try {
|
||||
// First try the master key
|
||||
key = EncryptionUtil.unwrapKey(conf, masterKeyName, keyBytes);
|
||||
} catch (KeyException e) {
|
||||
// If the current master key fails to unwrap, try the alternate, if
|
||||
// one is configured
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unable to unwrap key with current master key '" + masterKeyName + "'");
|
||||
}
|
||||
String alternateKeyName = conf.get(HConstants.CRYPTO_MASTERKEY_ALTERNATE_NAME_CONF_KEY);
|
||||
if (alternateKeyName != null) {
|
||||
try {
|
||||
key = EncryptionUtil.unwrapKey(conf, alternateKeyName, keyBytes);
|
||||
} catch (KeyException ex) {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
} else {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
// Use the algorithm the key wants
|
||||
cipher = Encryption.getCipher(conf, key.getAlgorithm());
|
||||
if (cipher == null) {
|
||||
throw new RuntimeException("Cipher '" + key.getAlgorithm() + "' is not available");
|
||||
}
|
||||
// Fail if misconfigured
|
||||
// We use the encryption type specified in the column schema as a sanity check on
|
||||
// what the wrapped key is telling us
|
||||
if (!cipher.getName().equalsIgnoreCase(cipherName)) {
|
||||
throw new RuntimeException("Encryption for family '" + family.getNameAsString()
|
||||
+ "' configured with type '" + cipherName + "' but key specifies algorithm '"
|
||||
+ cipher.getName() + "'");
|
||||
}
|
||||
} else {
|
||||
// Family does not provide key material, create a random key
|
||||
cipher = Encryption.getCipher(conf, cipherName);
|
||||
if (cipher == null) {
|
||||
throw new RuntimeException("Cipher '" + cipherName + "' is not available");
|
||||
}
|
||||
key = cipher.getRandomKey();
|
||||
}
|
||||
cryptoContext = Encryption.newContext(conf);
|
||||
cryptoContext.setCipher(cipher);
|
||||
cryptoContext.setKey(key);
|
||||
}
|
||||
return cryptoContext;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
|
@ -72,9 +73,10 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
private Path bulkloadPath;
|
||||
private CacheConfig compactionCacheConfig;
|
||||
private Tag tableNameTag;
|
||||
private Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
public PartitionedMobFileCompactor(Configuration conf, FileSystem fs, TableName tableName,
|
||||
HColumnDescriptor column, ExecutorService pool) {
|
||||
HColumnDescriptor column, ExecutorService pool) throws IOException {
|
||||
super(conf, fs, tableName, column, pool);
|
||||
mergeableSize = conf.getLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD,
|
||||
MobConstants.DEFAULT_MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD);
|
||||
|
@ -92,6 +94,7 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
copyOfConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0f);
|
||||
compactionCacheConfig = new CacheConfig(copyOfConf);
|
||||
tableNameTag = new Tag(TagType.MOB_TABLE_NAME_TAG_TYPE, tableName.getName());
|
||||
cryptoContext = MobUtils.createEncryptionContext(copyOfConf, column);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -347,12 +350,12 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
try {
|
||||
writer = MobUtils.createWriter(conf, fs, column, partition.getPartitionId().getDate(),
|
||||
tempPath, Long.MAX_VALUE, column.getCompactionCompression(), partition.getPartitionId()
|
||||
.getStartKey(), compactionCacheConfig);
|
||||
.getStartKey(), compactionCacheConfig, cryptoContext);
|
||||
filePath = writer.getPath();
|
||||
byte[] fileName = Bytes.toBytes(filePath.getName());
|
||||
// create a temp file and open a writer for it in the bulkloadPath
|
||||
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo
|
||||
.getSecond().longValue(), compactionCacheConfig);
|
||||
.getSecond().longValue(), compactionCacheConfig, cryptoContext);
|
||||
refFilePath = refFileWriter.getPath();
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
boolean hasMore = false;
|
||||
|
@ -457,7 +460,8 @@ public class PartitionedMobFileCompactor extends MobFileCompactor {
|
|||
try {
|
||||
writer = MobUtils.createDelFileWriter(conf, fs, column,
|
||||
MobUtils.formatDate(new Date(request.selectionTime)), tempPath, Long.MAX_VALUE,
|
||||
column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig);
|
||||
column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig,
|
||||
cryptoContext);
|
||||
filePath = writer.getPath();
|
||||
List<Cell> cells = new ArrayList<Cell>();
|
||||
boolean hasMore = false;
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Tag;
|
|||
import org.apache.hadoop.hbase.TagType;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.io.crypto.Encryption;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
|
@ -75,6 +76,7 @@ public class MemStoreWrapper {
|
|||
private Path mobFamilyDir;
|
||||
private FileSystem fs;
|
||||
private CacheConfig cacheConfig;
|
||||
private Encryption.Context cryptoContext = Encryption.Context.NONE;
|
||||
|
||||
public MemStoreWrapper(Context context, FileSystem fs, BufferedMutator table, HColumnDescriptor hcd,
|
||||
MemStore memstore, CacheConfig cacheConfig) throws IOException {
|
||||
|
@ -88,6 +90,7 @@ public class MemStoreWrapper {
|
|||
flushSize = this.conf.getLong(MobConstants.MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE,
|
||||
MobConstants.DEFAULT_MOB_SWEEP_TOOL_COMPACTION_MEMSTORE_FLUSH_SIZE);
|
||||
mobFamilyDir = MobUtils.getMobFamilyPath(conf, table.getName(), hcd.getNameAsString());
|
||||
cryptoContext = MobUtils.createEncryptionContext(conf, hcd);
|
||||
}
|
||||
|
||||
public void setPartitionId(SweepPartitionId partitionId) {
|
||||
|
@ -127,9 +130,9 @@ public class MemStoreWrapper {
|
|||
}
|
||||
// generate the files into a temp directory.
|
||||
String tempPathString = context.getConfiguration().get(SweepJob.WORKING_FILES_DIR_KEY);
|
||||
StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd,
|
||||
partitionId.getDate(), new Path(tempPathString), snapshot.getCellsCount(),
|
||||
hcd.getCompactionCompression(), partitionId.getStartKey(), cacheConfig);
|
||||
StoreFile.Writer mobFileWriter = MobUtils.createWriter(conf, fs, hcd, partitionId.getDate(),
|
||||
new Path(tempPathString), snapshot.getCellsCount(), hcd.getCompactionCompression(),
|
||||
partitionId.getStartKey(), cacheConfig, cryptoContext);
|
||||
|
||||
String relativePath = mobFileWriter.getPath().getName();
|
||||
LOG.info("Create files under a temp directory " + mobFileWriter.getPath().toString());
|
||||
|
|
|
@ -33,9 +33,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.FilterList;
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.mob.filecompactions;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.Key;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -32,6 +34,8 @@ import java.util.concurrent.SynchronousQueue;
|
|||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -40,22 +44,42 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.BufferedMutator;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.io.HFileLink;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -91,7 +115,11 @@ public class TestMobFileCompactor {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
|
||||
TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
|
||||
TEST_UTIL.getConfiguration()
|
||||
.setLong(MobConstants.MOB_FILE_COMPACTION_MERGEABLE_THRESHOLD, 5000);
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
|
||||
KeyProviderForTesting.class.getName());
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
TEST_UTIL.startMiniCluster(1);
|
||||
pool = createThreadPool(TEST_UTIL.getConfiguration());
|
||||
conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool);
|
||||
|
@ -210,6 +238,57 @@ public class TestMobFileCompactor {
|
|||
assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionWithoutDelFilesAndWithEncryption() throws Exception {
|
||||
resetConf();
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
SecureRandom rng = new SecureRandom();
|
||||
byte[] keyBytes = new byte[AES.KEY_LENGTH];
|
||||
rng.nextBytes(keyBytes);
|
||||
String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
|
||||
Key cfKey = new SecretKeySpec(keyBytes, algorithm);
|
||||
byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
|
||||
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey);
|
||||
String tableNameAsString = "testCompactionWithoutDelFilesAndWithEncryption";
|
||||
TableName tableName = TableName.valueOf(tableNameAsString);
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family1);
|
||||
hcd.setMobEnabled(true);
|
||||
hcd.setMobThreshold(0);
|
||||
hcd.setMaxVersions(4);
|
||||
hcd.setEncryptionType(algorithm);
|
||||
hcd.setEncryptionKey(encryptionKey);
|
||||
HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
|
||||
hcd2.setMobEnabled(true);
|
||||
hcd2.setMobThreshold(0);
|
||||
hcd2.setMaxVersions(4);
|
||||
desc.addFamily(hcd);
|
||||
desc.addFamily(hcd2);
|
||||
admin.createTable(desc, getSplitKeys());
|
||||
Table hTable = conn.getTable(tableName);
|
||||
BufferedMutator bufMut = conn.getBufferedMutator(tableName);
|
||||
int count = 4;
|
||||
// generate mob files
|
||||
loadData(admin, bufMut, tableName, count, rowNumPerFile);
|
||||
int rowNumPerRegion = count*rowNumPerFile;
|
||||
|
||||
assertEquals("Before compaction: mob rows count", regionNum*rowNumPerRegion,
|
||||
countMobRows(hTable));
|
||||
assertEquals("Before compaction: mob file count", regionNum * count,
|
||||
countFiles(tableName, true, family1));
|
||||
assertEquals("Before compaction: del file count", 0, countFiles(tableName, false, family1));
|
||||
|
||||
MobFileCompactor compactor = new PartitionedMobFileCompactor(conf, fs, tableName, hcd, pool);
|
||||
compactor.compact();
|
||||
|
||||
assertEquals("After compaction: mob rows count", regionNum*rowNumPerRegion,
|
||||
countMobRows(hTable));
|
||||
assertEquals("After compaction: mob file count", regionNum,
|
||||
countFiles(tableName, true, family1));
|
||||
assertEquals("After compaction: del file count", 0, countFiles(tableName, false, family1));
|
||||
Assert.assertTrue(verifyEncryption(tableName, family1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionWithDelFiles() throws Exception {
|
||||
resetConf();
|
||||
|
@ -632,6 +711,27 @@ public class TestMobFileCompactor {
|
|||
return count;
|
||||
}
|
||||
|
||||
private boolean verifyEncryption(TableName tableName, String familyName) throws IOException {
|
||||
Path mobDirPath = MobUtils.getMobFamilyPath(MobUtils.getMobRegionPath(conf, tableName),
|
||||
familyName);
|
||||
boolean hasFiles = false;
|
||||
if (fs.exists(mobDirPath)) {
|
||||
FileStatus[] files = fs.listStatus(mobDirPath);
|
||||
hasFiles = files != null && files.length > 0;
|
||||
Assert.assertTrue(hasFiles);
|
||||
Path path = files[0].getPath();
|
||||
CacheConfig cacheConf = new CacheConfig(conf);
|
||||
StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf,
|
||||
BloomType.NONE);
|
||||
HFile.Reader reader = sf.createReader().getHFileReader();
|
||||
byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
|
||||
Assert.assertTrue(null != encryptionKey);
|
||||
Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
|
||||
.equals(HConstants.CIPHER_AES));
|
||||
}
|
||||
return hasFiles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of HFileLink in the mob path.
|
||||
* @param familyName the family name
|
||||
|
|
|
@ -20,26 +20,40 @@ package org.apache.hadoop.hbase.mob.mapreduce;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.Key;
|
||||
import java.security.SecureRandom;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import javax.crypto.spec.SecretKeySpec;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.*;
|
||||
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
|
||||
import org.apache.hadoop.hbase.io.crypto.aes.AES;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.mob.MobConstants;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.security.EncryptionUtil;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -48,7 +62,7 @@ import org.junit.experimental.categories.Category;
|
|||
@Category(MediumTests.class)
|
||||
public class TestMobSweeper {
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private String tableName;
|
||||
private TableName tableName;
|
||||
private final static String row = "row_";
|
||||
private final static String family = "family";
|
||||
private final static String column = "column";
|
||||
|
@ -61,9 +75,14 @@ public class TestMobSweeper {
|
|||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", 0);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.regionserver.info.port.auto", true);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15); // avoid major compactions
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30); // avoid major compactions
|
||||
// avoid major compactions
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.min", 15);
|
||||
// avoid major compactions
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compaction.max", 30);
|
||||
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY,
|
||||
KeyProviderForTesting.class.getName());
|
||||
TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase");
|
||||
|
||||
TEST_UTIL.startMiniCluster();
|
||||
|
||||
|
@ -76,36 +95,34 @@ public class TestMobSweeper {
|
|||
TEST_UTIL.shutdownMiniMapReduceCluster();
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
long tid = System.currentTimeMillis();
|
||||
tableName = "testSweeper" + tid;
|
||||
tableName = TableName.valueOf("testSweeper" + tid);
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setMobEnabled(true);
|
||||
hcd.setMobThreshold(3L);
|
||||
hcd.setMobThreshold(0);
|
||||
hcd.setMaxVersions(4);
|
||||
desc.addFamily(hcd);
|
||||
|
||||
admin = TEST_UTIL.getHBaseAdmin();
|
||||
admin.createTable(desc);
|
||||
Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
TableName tn = TableName.valueOf(tableName);
|
||||
table = c.getTable(tn);
|
||||
bufMut = c.getBufferedMutator(tn);
|
||||
table = c.getTable(tableName);
|
||||
bufMut = c.getBufferedMutator(tableName);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
admin.disableTable(TableName.valueOf(tableName));
|
||||
admin.deleteTable(TableName.valueOf(tableName));
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
admin.close();
|
||||
}
|
||||
|
||||
private Path getMobFamilyPath(Configuration conf, String tableNameStr,
|
||||
private Path getMobFamilyPath(Configuration conf, TableName tableName,
|
||||
String familyName) {
|
||||
Path p = new Path(MobUtils.getMobRegionPath(conf, TableName.valueOf(tableNameStr)),
|
||||
Path p = new Path(MobUtils.getMobRegionPath(conf, tableName),
|
||||
familyName);
|
||||
return p;
|
||||
}
|
||||
|
@ -117,7 +134,7 @@ public class TestMobSweeper {
|
|||
return sb.toString();
|
||||
}
|
||||
|
||||
private void generateMobTable(Admin admin, BufferedMutator table, String tableName, int count,
|
||||
private void generateMobTable(Admin admin, BufferedMutator table, TableName tableName, int count,
|
||||
int flushStep) throws IOException, InterruptedException {
|
||||
if (count <= 0 || flushStep <= 0)
|
||||
return;
|
||||
|
@ -131,11 +148,11 @@ public class TestMobSweeper {
|
|||
table.mutate(put);
|
||||
if (index++ % flushStep == 0) {
|
||||
table.flush();
|
||||
admin.flush(TableName.valueOf(tableName));
|
||||
admin.flush(tableName);
|
||||
}
|
||||
}
|
||||
table.flush();
|
||||
admin.flush(TableName.valueOf(tableName));
|
||||
admin.flush(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -173,7 +190,7 @@ public class TestMobSweeper {
|
|||
conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 24 * 60 * 60 * 1000);
|
||||
|
||||
String[] args = new String[2];
|
||||
args[0] = tableName;
|
||||
args[0] = tableName.getNameAsString();
|
||||
args[1] = family;
|
||||
assertEquals(0, ToolRunner.run(conf, new Sweeper(), args));
|
||||
|
||||
|
@ -208,8 +225,8 @@ public class TestMobSweeper {
|
|||
.equalsIgnoreCase(mobFilesSet.iterator().next()));
|
||||
}
|
||||
|
||||
private void testCompactionDelaySweeperInternal(Table table, BufferedMutator bufMut, String tableName)
|
||||
throws Exception {
|
||||
private void testCompactionDelaySweeperInternal(Table table, BufferedMutator bufMut,
|
||||
TableName tableName, boolean encrypted) throws Exception {
|
||||
int count = 10;
|
||||
//create table and generate 10 mob files
|
||||
generateMobTable(admin, bufMut, tableName, count, 1);
|
||||
|
@ -242,7 +259,7 @@ public class TestMobSweeper {
|
|||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
conf.setLong(SweepJob.MOB_SWEEP_JOB_DELAY, 0);
|
||||
String[] args = new String[2];
|
||||
args[0] = tableName;
|
||||
args[0] = tableName.getNameAsString();
|
||||
args[1] = family;
|
||||
assertEquals(0, ToolRunner.run(conf, new Sweeper(), args));
|
||||
|
||||
|
@ -268,18 +285,61 @@ public class TestMobSweeper {
|
|||
assertEquals(1, mobFilesScannedAfterJob.size());
|
||||
|
||||
fileStatuses = TEST_UTIL.getTestFileSystem().listStatus(mobFamilyPath);
|
||||
Path lastFilePath = null;
|
||||
mobFilesSet = new TreeSet<String>();
|
||||
for (FileStatus status : fileStatuses) {
|
||||
mobFilesSet.add(status.getPath().getName());
|
||||
lastFilePath = status.getPath();
|
||||
}
|
||||
assertEquals(1, mobFilesSet.size());
|
||||
assertEquals(true, mobFilesScannedAfterJob.iterator().next()
|
||||
.equalsIgnoreCase(mobFilesSet.iterator().next()));
|
||||
if (encrypted) {
|
||||
// assert the encryption context
|
||||
CacheConfig cacheConf = new CacheConfig(conf);
|
||||
StoreFile sf = new StoreFile(TEST_UTIL.getTestFileSystem(), lastFilePath, conf, cacheConf,
|
||||
BloomType.NONE);
|
||||
HFile.Reader reader = sf.createReader().getHFileReader();
|
||||
byte[] encryptionKey = reader.getTrailer().getEncryptionKey();
|
||||
Assert.assertTrue(null != encryptionKey);
|
||||
Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName()
|
||||
.equals(HConstants.CIPHER_AES));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionDelaySweeperWithEncryption() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
SecureRandom rng = new SecureRandom();
|
||||
byte[] keyBytes = new byte[AES.KEY_LENGTH];
|
||||
rng.nextBytes(keyBytes);
|
||||
String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES);
|
||||
Key cfKey = new SecretKeySpec(keyBytes, algorithm);
|
||||
byte[] encryptionKey = EncryptionUtil.wrapKey(conf,
|
||||
conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey);
|
||||
String tableNameAsString = "testCompactionDelaySweeperWithEncryption";
|
||||
TableName tableName = TableName.valueOf(tableNameAsString);
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setMobEnabled(true);
|
||||
hcd.setMobThreshold(0);
|
||||
hcd.setMaxVersions(4);
|
||||
hcd.setEncryptionType(algorithm);
|
||||
hcd.setEncryptionKey(encryptionKey);
|
||||
desc.addFamily(hcd);
|
||||
admin.createTable(desc);
|
||||
Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
BufferedMutator bufMut = c.getBufferedMutator(tableName);
|
||||
Table table = c.getTable(tableName);
|
||||
testCompactionDelaySweeperInternal(table, bufMut, tableName, true);
|
||||
table.close();
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompactionDelaySweeper() throws Exception {
|
||||
testCompactionDelaySweeperInternal(table, bufMut, tableName);
|
||||
testCompactionDelaySweeperInternal(table, bufMut, tableName, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -292,14 +352,14 @@ public class TestMobSweeper {
|
|||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor hcd = new HColumnDescriptor(family);
|
||||
hcd.setMobEnabled(true);
|
||||
hcd.setMobThreshold(3L);
|
||||
hcd.setMobThreshold(0);
|
||||
hcd.setMaxVersions(4);
|
||||
desc.addFamily(hcd);
|
||||
admin.createTable(desc);
|
||||
Connection c = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
BufferedMutator bufMut = c.getBufferedMutator(tableName);
|
||||
Table table = c.getTable(tableName);
|
||||
testCompactionDelaySweeperInternal(table, bufMut, tableNameAsString);
|
||||
testCompactionDelaySweeperInternal(table, bufMut, tableName, false);
|
||||
table.close();
|
||||
admin.disableTable(tableName);
|
||||
admin.deleteTable(tableName);
|
||||
|
|
Loading…
Reference in New Issue