HBASE-17690 Clean up MOB code

This commit is contained in:
Jingcheng Du 2017-02-28 09:57:34 +08:00
parent be1cdc7376
commit 5a8f1e8aaa
14 changed files with 250 additions and 174 deletions

View File

@ -1206,8 +1206,8 @@ public class HBaseAdmin implements Admin {
* @param tableName table or region to compact
* @param columnFamily column family within a table or region
* @param major True if we are to do a major compaction.
* @param compactType {@link org.apache.hadoop.hbase.client.CompactType}
* @throws IOException if a remote or network exception occurs
* @throws InterruptedException
*/
private void compact(final TableName tableName, final byte[] columnFamily,final boolean major,
CompactType compactType) throws IOException {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -36,17 +37,21 @@ import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Compact passed set of files in the mob-enabled column family.
@ -164,12 +169,20 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
int bytesWritten = 0;
long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<Cell>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckInterval = HStore.getCloseCheckInterval();
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
}
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
byte[] fileName = null;
@ -177,25 +190,41 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
long mobCells = 0, deleteMarkersCount = 0;
long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
boolean finished = false;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null;
long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getFamily().getBlocksize();
try {
try {
// If the mob file writer could not be created, directly write the cell to the store file.
mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
compactionCompression, store.getRegionInfo().getStartKey(), true);
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
} catch (IOException e) {
LOG.error("Failed to create mob writer, "
LOG.warn("Failed to create mob writer, "
+ "we will continue the compaction by writing MOB cells directly in store files", e);
}
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
if (major) {
try {
delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs),
fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey());
} catch (IOException e) {
LOG.warn(
"Failed to create del writer, "
+ "we will continue the compaction by writing delete markers directly in store files",
e);
}
}
do {
hasMore = scanner.next(cells, scannerContext);
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
}
for (Cell c : cells) {
if (major && CellUtil.isDelete(c)) {
if (MobUtils.isMobReferenceCell(c)) {
if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) {
// Directly write it to a store file
writer.append(c);
} else {
@ -254,56 +283,83 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
cellsCountCompactedToMob++;
cellsSizeCompactedToMob += c.getValueLength();
}
int len = KeyValueUtil.length(c);
++progress.currentCompactedKVs;
progress.totalCompactedSize += len;
bytesWrittenProgressForShippedCall += len;
if (LOG.isDebugEnabled()) {
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckInterval > 0) {
bytesWritten += KeyValueUtil.length(c);
if (bytesWritten > closeCheckInterval) {
bytesWritten = 0;
if (closeCheckSizeLimit > 0) {
bytesWrittenProgressForCloseCheck += len;
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener)writer).beforeShipped();
kvs.shipped();
bytesWrittenProgressForShippedCall = 0;
}
}
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
LOG.debug("Compaction progress: "
+ compactionName
+ " "
+ progress
+ String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0)
/ ((now - lastMillis) / 1000.0)) + ", throughputController is "
+ throughputController);
lastMillis = now;
bytesWrittenProgressForLog = 0;
}
}
cells.clear();
} while (hasMore);
finished = true;
} catch (InterruptedException e) {
progress.cancel();
throw new InterruptedIOException(
"Interrupted while control throughput of compacting " + compactionName);
} finally {
if (mobFileWriter != null) {
mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
mobFileWriter.close();
throughputController.finish(compactionName);
if (!finished && mobFileWriter != null) {
abortWriter(mobFileWriter);
}
if (delFileWriter != null) {
delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
delFileWriter.close();
}
}
if (mobFileWriter != null) {
if (mobCells > 0) {
// If the mob file is not empty, commit it.
mobStore.commitFile(mobFileWriter.getPath(), path);
} else {
try {
// If the mob file is empty, delete it instead of committing.
store.getFileSystem().delete(mobFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Failed to delete the temp mob file", e);
}
if (!finished && delFileWriter != null) {
abortWriter(delFileWriter);
}
}
if (delFileWriter != null) {
if (deleteMarkersCount > 0) {
// If the del file is not empty, commit it.
// If the commit fails, the compaction is re-performed again.
delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount);
delFileWriter.close();
mobStore.commitFile(delFileWriter.getPath(), path);
} else {
try {
// If the del file is empty, delete it instead of committing.
store.getFileSystem().delete(delFileWriter.getPath(), true);
} catch (IOException e) {
LOG.error("Failed to delete the temp del file", e);
}
// If the del file is empty, delete it instead of committing.
abortWriter(delFileWriter);
}
}
if (mobFileWriter != null) {
if (mobCells > 0) {
// If the mob file is not empty, commit it.
mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells);
mobFileWriter.close();
mobStore.commitFile(mobFileWriter.getPath(), path);
} else {
// If the mob file is empty, delete it instead of committing.
abortWriter(mobFileWriter);
}
}
mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
@ -112,14 +115,23 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
synchronized (flushLock) {
status.setStatus("Flushing " + store + ": creating writer");
// Write the map out to the disk
writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
false, true, true, false/*default for dropbehind*/, snapshot.getTimeRangeTracker());
writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(),
false, true, true, false, snapshot.getTimeRangeTracker());
IOException e = null;
try {
// It's a mob store, flush the cells in a mob way. This is the difference of flushing
// between a normal and a mob store.
performMobFlush(snapshot, cacheFlushId, scanner, writer, status);
performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
throw ioe;
} finally {
finalizeWriter(writer, cacheFlushId, status);
if (e != null) {
writer.close();
} else {
finalizeWriter(writer, cacheFlushId, status);
}
}
}
} finally {
@ -148,10 +160,12 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
* @param scanner The scanner of memstore snapshot.
* @param writer The store file writer.
* @param status Task that represents the flush operation and may be updated with status.
* @param throughputController A controller to avoid flush too fast.
* @throws IOException
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status) throws IOException {
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
ThroughputController throughputController) throws IOException {
StoreFileWriter mobFileWriter = null;
int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX,
HConstants.COMPACTION_KV_MAX_DEFAULT);
@ -159,16 +173,21 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
long mobSize = 0;
long time = snapshot.getTimeRangeTracker().getMax();
mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
store.getFamily().getCompression(), store.getRegionInfo().getStartKey());
store.getFamily().getCompressionType(), store.getRegionInfo().getStartKey(), false);
// the target path is {tableName}/.mob/{cfName}/mobFiles
// the relative path is mobFiles
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
List<Cell> cells = new ArrayList<Cell>();
boolean hasMore;
String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
boolean control = throughputController != null && !store.getRegionInfo().isSystemTable();
if (control) {
throughputController.start(flushName);
}
IOException ioe = null;
try {
List<Cell> cells = new ArrayList<Cell>();
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
do {
hasMore = scanner.next(cells, scannerContext);
if (!cells.isEmpty()) {
@ -191,15 +210,28 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
this.mobStore.getRefCellTags());
writer.append(reference);
}
int len = KeyValueUtil.length(c);
if (control) {
throughputController.control(flushName, len);
}
}
cells.clear();
}
} while (hasMore);
} catch (InterruptedException e) {
ioe = new InterruptedIOException(
"Interrupted while control throughput of flushing " + flushName);
throw ioe;
} catch (IOException e) {
ioe = e;
throw e;
} finally {
status.setStatus("Flushing mob file " + store + ": appending metadata");
mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
status.setStatus("Flushing mob file " + store + ": closing flushed file");
mobFileWriter.close();
if (control) {
throughputController.finish(flushName);
}
if (ioe != null) {
mobFileWriter.close();
}
}
if (mobCount > 0) {
@ -207,12 +239,18 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
// If the mob file is committed successfully but the store file is not,
// the committed mob file will be handled by the sweep tool as an unused
// file.
status.setStatus("Flushing mob file " + store + ": appending metadata");
mobFileWriter.appendMetadata(cacheFlushId, false, mobCount);
status.setStatus("Flushing mob file " + store + ": closing flushed file");
mobFileWriter.close();
mobStore.commitFile(mobFileWriter.getPath(), targetPath);
mobStore.updateMobFlushCount();
mobStore.updateMobFlushedCellsCount(mobCount);
mobStore.updateMobFlushedCellsSize(mobSize);
} else {
try {
status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file");
mobFileWriter.close();
// If the mob file is empty, delete it instead of committing.
store.getFileSystem().delete(mobFileWriter.getPath(), true);
} catch (IOException e) {

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.master.locking.LockManager;
@ -71,6 +72,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@ -489,8 +491,6 @@ public final class MobUtils {
// find the original mob files by this table name. For details please see cloning
// snapshot for mob files.
tags.add(tableNameTag);
// Add the existing tags.
TagUtil.carryForwardTags(tags, cell);
return createMobRefCell(cell, fileName, TagUtil.fromList(tags));
}
@ -511,18 +511,19 @@ public final class MobUtils {
* @param startKey The hex string of the start key.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
Encryption.Context cryptoContext, boolean isCompaction)
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date,
UUID.randomUUID().toString().replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext);
cacheConfig, cryptoContext, isCompaction);
}
/**
@ -534,25 +535,19 @@ public final class MobUtils {
* @param maxKeyCount The key count.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
Encryption.Context cryptoContext, boolean isCompaction)
throws IOException {
HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true)
.withIncludesTags(true).withCompression(family.getCompactionCompression())
.withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime())
.build();
Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", ""));
StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConfig, fs).withFilePath(tempPath)
.withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType())
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
return createWriter(conf, fs, family,
new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount,
family.getCompactionCompressionType(), cacheConfig, cryptoContext,
HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(),
family.getBloomFilterType(), isCompaction);
}
/**
@ -567,18 +562,19 @@ public final class MobUtils {
* @param startKey The start key.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
Encryption.Context cryptoContext, boolean isCompaction)
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date,
UUID.randomUUID().toString().replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext);
cacheConfig, cryptoContext, isCompaction);
}
/**
@ -605,7 +601,7 @@ public final class MobUtils {
.randomUUID().toString().replaceAll("-", "") + "_del";
MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext);
cacheConfig, cryptoContext, true);
}
/**
@ -619,26 +615,69 @@ public final class MobUtils {
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
private static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext)
throws IOException {
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(true).withIncludesTags(true)
.withCompressTags(family.isCompressTags())
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize())
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(cryptoContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
boolean isCompaction)
throws IOException {
return createWriter(conf, fs, family,
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig,
cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf),
family.getBlocksize(), BloomType.NONE, isCompaction);
}
StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConfig, fs)
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param path The path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param checksumType The checksum type.
* @param bytesPerChecksum The bytes per checksum.
* @param blocksize The HFile block size.
* @param bloomType The bloom filter type.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
HColumnDescriptor family, Path path, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType,
boolean isCompaction)
throws IOException {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
final CacheConfig writerCacheConf;
if (isCompaction) {
writerCacheConf = new CacheConfig(cacheConfig);
writerCacheConf.setCacheDataOnWrite(false);
} else {
writerCacheConf = cacheConfig;
}
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(true).withIncludesTags(true)
.withCompressTags(family.isCompressTags())
.withChecksumType(checksumType)
.withBytesPerCheckSum(bytesPerChecksum)
.withBlockSize(blocksize)
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(cryptoContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs)
.withFilePath(path)
.withComparator(CellComparator.COMPARATOR).withBloomType(bloomType)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
}

View File

@ -26,7 +26,6 @@ import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;

View File

@ -629,13 +629,14 @@ public class PartitionedMobCompactor extends MobCompactor {
writer = MobUtils
.createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath,
Long.MAX_VALUE, column.getCompactionCompressionType(),
partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext);
partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext,
true);
cleanupTmpMobFile = true;
filePath = writer.getPath();
byte[] fileName = Bytes.toBytes(filePath.getName());
// create a temp file and open a writer for it in the bulkloadPath
refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath,
fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext);
fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true);
cleanupBulkloadDirOfPartition = true;
List<Cell> cells = new ArrayList<>();
boolean hasMore;

View File

@ -48,10 +48,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobCacheConfig;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFile;
@ -59,7 +56,6 @@ import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobStoreEngine;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
import org.apache.hadoop.hbase.util.IdLock;
@ -190,16 +186,19 @@ public class HMobStore extends HStore {
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFileWriter createWriterInTmp(Date date, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
Compression.Algorithm compression, byte[] startKey,
boolean isCompaction) throws IOException {
if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
}
Path path = getTempDir();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey);
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey,
isCompaction);
}
/**
@ -222,7 +221,7 @@ public class HMobStore extends HStore {
String suffix = UUID
.randomUUID().toString().replaceAll("-", "") + "_del";
MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix);
return createWriterInTmp(mobFileName, path, maxKeyCount, compression);
return createWriterInTmp(mobFileName, path, maxKeyCount, compression, true);
}
/**
@ -232,14 +231,16 @@ public class HMobStore extends HStore {
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey) throws IOException {
Compression.Algorithm compression, byte[] startKey,
boolean isCompaction) throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID()
.toString().replaceAll("-", ""));
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression);
return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction);
}
/**
@ -248,27 +249,16 @@ public class HMobStore extends HStore {
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
* @throws IOException
*/
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath,
long maxKeyCount, Compression.Algorithm compression) throws IOException {
final CacheConfig writerCacheConf = mobCacheConfig;
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(true).withIncludesTags(true)
.withCompressTags(family.isCompressTags())
.withChecksumType(checksumType)
.withBytesPerCheckSum(bytesPerChecksum)
.withBlockSize(blocksize)
.withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding())
.withEncryptionContext(cryptoContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, region.getFilesystem())
.withFilePath(new Path(basePath, mobFileName.getFileName()))
.withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
long maxKeyCount, Compression.Algorithm compression,
boolean isCompaction) throws IOException {
return MobUtils.createWriter(conf, region.getFilesystem(), family,
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, mobCacheConfig,
cryptoContext, checksumType, bytesPerChecksum, blocksize, BloomType.NONE, isCompaction);
}
/**
@ -412,7 +402,7 @@ public class HMobStore extends HStore {
throwable = e;
if ((e instanceof FileNotFoundException) ||
(e.getCause() instanceof FileNotFoundException)) {
LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e);
LOG.debug("Fail to read the cell, the mob file " + path + " doesn't exist", e);
} else if (e instanceof CorruptHFileException) {
LOG.error("The mob file " + path + " is corrupt", e);
break;
@ -421,11 +411,11 @@ public class HMobStore extends HStore {
}
} catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt()
mobCacheConfig.getMobFileCache().evictFile(fileName);
LOG.warn("Fail to read the cell", e);
LOG.debug("Fail to read the cell", e);
throwable = e;
} catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt()
mobCacheConfig.getMobFileCache().evictFile(fileName);
LOG.warn("Fail to read the cell", e);
LOG.debug("Fail to read the cell", e);
throwable = e;
} finally {
if (file != null) {

View File

@ -173,7 +173,7 @@ public class HStore implements Store {
protected int bytesPerChecksum;
// Comparing KeyValues
private final CellComparator comparator;
protected final CellComparator comparator;
final StoreEngine<?, ?, ?, ?> storeEngine;

View File

@ -68,7 +68,7 @@ import com.google.common.io.Closeables;
@InterfaceAudience.Private
public abstract class Compactor<T extends CellSink> {
private static final Log LOG = LogFactory.getLog(Compactor.class);
private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected volatile CompactionProgress progress;
protected final Configuration conf;
protected final Store store;

View File

@ -2286,7 +2286,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
}
public int countRows(final InternalScanner scanner) throws IOException {
// Do not retrieve the mob data when scanning
int scannedCount = 0;
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = true;

View File

@ -134,7 +134,7 @@ public class TestMobFileCache extends TestCase {
int maxKeyCount = keys.length;
HRegionInfo regionInfo = new HRegionInfo(tn);
StoreFileWriter mobWriter = mobStore.createWriterInTmp(currentDate,
maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey());
maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey(), false);
Path mobFilePath = mobWriter.getPath();
String fileName = mobFilePath.getName();
mobWriter.append(key1);

View File

@ -330,38 +330,6 @@ public class TestMobCompactor {
countFiles(tableName, false, family2));
}
private void waitUntilFilesShowup(final TableName table, final String famStr, final int num)
throws InterruptedException, IOException {
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0);
// Make sure that it is flushed.
FileSystem fs = r.getRegionFileSystem().getFileSystem();
Path path = r.getRegionFileSystem().getStoreDir(famStr);
FileStatus[] fileList = fs.listStatus(path);
while (fileList.length != num) {
Thread.sleep(50);
fileList = fs.listStatus(path);
}
}
private int numberOfMobFiles(final TableName table, final String famStr)
throws IOException {
HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0);
// Make sure that it is flushed.
FileSystem fs = r.getRegionFileSystem().getFileSystem();
Path path = r.getRegionFileSystem().getStoreDir(famStr);
FileStatus[] fileList = fs.listStatus(path);
return fileList.length;
}
@Test
public void testMinorCompactionWithWeeklyPolicy() throws Exception {
resetConf();
@ -766,20 +734,6 @@ public class TestMobCompactor {
}
}
private void waitUntilCompactionFinished(TableName tableName) throws IOException,
InterruptedException {
long finished = EnvironmentEdgeManager.currentTime() + 60000;
CompactionState state = admin.getCompactionState(tableName);
while (EnvironmentEdgeManager.currentTime() < finished) {
if (state == CompactionState.NONE) {
break;
}
state = admin.getCompactionState(tableName);
Thread.sleep(10);
}
assertEquals(CompactionState.NONE, state);
}
private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
InterruptedException {
long finished = EnvironmentEdgeManager.currentTime() + 60000;

View File

@ -180,7 +180,7 @@ public class TestHMobStore {
KeyValue[] keys = new KeyValue[] { key1, key2, key3 };
int maxKeyCount = keys.length;
StoreFileWriter mobWriter = store.createWriterInTmp(currentDate, maxKeyCount,
hcd.getCompactionCompressionType(), region.getRegionInfo().getStartKey());
hcd.getCompactionCompressionType(), region.getRegionInfo().getStartKey(), false);
mobFilePath = mobWriter.getPath();
mobWriter.append(key1);

View File

@ -167,7 +167,7 @@ public class TestMobStoreScanner {
table.put(put);
Get g = new Get(row1);
Result r = table.get(g);
table.get(g);
// should not have blown up.
}