HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (#1446)
* Reorganize MOB compaction tests for more reuse. * Add tests for mob compaction after snapshot clone operations * note the original table used to write a given mob hfile and use that to find it later. Signed-off-by: Esteban Gutierrez <esteban@apache.org>
This commit is contained in:
parent
2d78a286b6
commit
eb7df0498c
@ -915,7 +915,7 @@ public final class PrivateCellUtil {
|
||||
* Retrieve Cell's first tag, matching the passed in type
|
||||
* @param cell The Cell
|
||||
* @param type Type of the Tag to retrieve
|
||||
* @return null if there is no tag of the passed in tag type
|
||||
* @return Optional, empty if there is no tag of the passed in tag type
|
||||
*/
|
||||
public static Optional<Tag> getTag(Cell cell, byte type) {
|
||||
boolean bufferBacked = cell instanceof ByteBufferExtendedCell;
|
||||
|
@ -24,9 +24,12 @@ import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Immutable POJO class for representing a table name.
|
||||
* Which is of the form:
|
||||
@ -146,9 +149,7 @@ public final class TableName implements Comparable<TableName> {
|
||||
throw new IllegalArgumentException("Name is null or empty");
|
||||
}
|
||||
|
||||
int namespaceDelimIndex =
|
||||
org.apache.hbase.thirdparty.com.google.common.primitives.Bytes.lastIndexOf(tableName,
|
||||
(byte) NAMESPACE_DELIM);
|
||||
int namespaceDelimIndex = ArrayUtils.lastIndexOf(tableName, (byte) NAMESPACE_DELIM);
|
||||
if (namespaceDelimIndex < 0){
|
||||
isLegalTableQualifierName(tableName);
|
||||
} else {
|
||||
@ -433,33 +434,73 @@ public final class TableName implements Comparable<TableName> {
|
||||
|
||||
|
||||
/**
|
||||
* @param fullName will use the entire byte array
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta. Some code
|
||||
* depends on this. The test is buried in the table creation to save on array comparison
|
||||
* when we're creating a standard table object that will be in the cache.
|
||||
*/
|
||||
public static TableName valueOf(byte[] fullName) throws IllegalArgumentException{
|
||||
return valueOf(fullName, 0, fullName.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fullName byte array to look into
|
||||
* @param offset within said array
|
||||
* @param length within said array
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta.
|
||||
*/
|
||||
public static TableName valueOf(byte[] fullName, int offset, int length)
|
||||
throws IllegalArgumentException {
|
||||
Preconditions.checkArgument(offset >= 0, "offset must be non-negative but was %s", offset);
|
||||
Preconditions.checkArgument(offset < fullName.length, "offset (%s) must be < array length (%s)",
|
||||
offset, fullName.length);
|
||||
Preconditions.checkArgument(length <= fullName.length,
|
||||
"length (%s) must be <= array length (%s)", length, fullName.length);
|
||||
for (TableName tn : tableCache) {
|
||||
if (Arrays.equals(tn.getName(), fullName)) {
|
||||
final byte[] tnName = tn.getName();
|
||||
if (Bytes.equals(tnName, 0, tnName.length, fullName, offset, length)) {
|
||||
return tn;
|
||||
}
|
||||
}
|
||||
|
||||
int namespaceDelimIndex =
|
||||
org.apache.hbase.thirdparty.com.google.common.primitives.Bytes.lastIndexOf(fullName,
|
||||
(byte) NAMESPACE_DELIM);
|
||||
int namespaceDelimIndex = ArrayUtils.lastIndexOf(fullName, (byte) NAMESPACE_DELIM,
|
||||
offset + length - 1);
|
||||
|
||||
if (namespaceDelimIndex < 0) {
|
||||
if (namespaceDelimIndex < offset) {
|
||||
return createTableNameIfNecessary(
|
||||
ByteBuffer.wrap(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME),
|
||||
ByteBuffer.wrap(fullName));
|
||||
ByteBuffer.wrap(fullName, offset, length));
|
||||
} else {
|
||||
return createTableNameIfNecessary(
|
||||
ByteBuffer.wrap(fullName, 0, namespaceDelimIndex),
|
||||
ByteBuffer.wrap(fullName, namespaceDelimIndex + 1,
|
||||
fullName.length - (namespaceDelimIndex + 1)));
|
||||
ByteBuffer.wrap(fullName, offset, namespaceDelimIndex),
|
||||
ByteBuffer.wrap(fullName, namespaceDelimIndex + 1, length - (namespaceDelimIndex + 1)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fullname of a table, possibly with a leading namespace and ':' as delimiter.
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta.
|
||||
*/
|
||||
public static TableName valueOf(ByteBuffer fullname) {
|
||||
fullname = fullname.duplicate();
|
||||
fullname.mark();
|
||||
boolean miss = true;
|
||||
while (fullname.hasRemaining() && miss) {
|
||||
miss = ((byte) NAMESPACE_DELIM) != fullname.get();
|
||||
}
|
||||
if (miss) {
|
||||
fullname.reset();
|
||||
return valueOf(null, fullname);
|
||||
} else {
|
||||
ByteBuffer qualifier = fullname.slice();
|
||||
int delimiterIndex = fullname.position() - 1;
|
||||
fullname.reset();
|
||||
// changing variable name for clarity
|
||||
ByteBuffer namespace = fullname.duplicate();
|
||||
namespace.limit(delimiterIndex);
|
||||
return valueOf(namespace, qualifier);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta. Some code
|
||||
|
@ -43,6 +43,7 @@ import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TimeZone;
|
||||
@ -437,17 +438,16 @@ public class HFilePrettyPrinter extends Configured implements Tool {
|
||||
}
|
||||
// check if mob files are missing.
|
||||
if (checkMobIntegrity && MobUtils.isMobReferenceCell(cell)) {
|
||||
Tag tnTag = MobUtils.getTableNameTag(cell);
|
||||
if (tnTag == null) {
|
||||
Optional<TableName> tn = MobUtils.getTableName(cell);
|
||||
if (! tn.isPresent()) {
|
||||
System.err.println("ERROR, wrong tag format in mob reference cell "
|
||||
+ CellUtil.getCellKeyAsString(cell));
|
||||
} else if (!MobUtils.hasValidMobRefCellValue(cell)) {
|
||||
System.err.println("ERROR, wrong value format in mob reference cell "
|
||||
+ CellUtil.getCellKeyAsString(cell));
|
||||
} else {
|
||||
TableName tn = TableName.valueOf(Tag.cloneValue(tnTag));
|
||||
String mobFileName = MobUtils.getMobFileName(cell);
|
||||
boolean exist = mobFileExists(fs, tn, mobFileName,
|
||||
boolean exist = mobFileExists(fs, tn.get(), mobFileName,
|
||||
Bytes.toString(CellUtil.cloneFamily(cell)), foundMobFiles, missingMobFiles);
|
||||
if (!exist) {
|
||||
// report error
|
||||
|
@ -26,9 +26,9 @@ import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -37,9 +37,11 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||
@ -60,7 +62,10 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
/**
|
||||
* Compact passed set of files in the mob-enabled column family.
|
||||
@ -79,13 +84,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
* content of it is written into meta section of a newly created store file at the final step of
|
||||
* compaction process.
|
||||
*/
|
||||
|
||||
static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() {
|
||||
@Override
|
||||
protected Set<String> initialValue() {
|
||||
return new HashSet<String>();
|
||||
}
|
||||
};
|
||||
static ThreadLocal<SetMultimap<TableName,String>> mobRefSet =
|
||||
ThreadLocal.withInitial(HashMultimap::create);
|
||||
|
||||
/*
|
||||
* Is it user or system-originated request.
|
||||
@ -190,34 +190,71 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
// Check if I/O optimized MOB compaction
|
||||
if (ioOptimizedMode) {
|
||||
if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) {
|
||||
Path mobDir =
|
||||
MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
|
||||
List<Path> mobFiles = MobUtils.getReferencedMobFiles(request.getFiles(), mobDir);
|
||||
//reset disableIO
|
||||
disableIO.set(Boolean.FALSE);
|
||||
if (mobFiles.size() > 0) {
|
||||
calculateMobLengthMap(mobFiles);
|
||||
try {
|
||||
final SetMultimap<TableName, String> mobRefs = request.getFiles().stream()
|
||||
.map(file -> {
|
||||
byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
||||
ImmutableSetMultimap.Builder<TableName, String> builder;
|
||||
if (value == null) {
|
||||
builder = ImmutableSetMultimap.builder();
|
||||
} else {
|
||||
try {
|
||||
builder = MobUtils.deserializeMobFileRefs(value);
|
||||
} catch (RuntimeException exception) {
|
||||
throw new RuntimeException("failure getting mob references for hfile " + file,
|
||||
exception);
|
||||
}
|
||||
}
|
||||
return builder;
|
||||
}).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder)
|
||||
.build();
|
||||
//reset disableIO
|
||||
disableIO.set(Boolean.FALSE);
|
||||
if (!mobRefs.isEmpty()) {
|
||||
calculateMobLengthMap(mobRefs);
|
||||
}
|
||||
LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "+
|
||||
"Total referenced MOB files: {}", tableName, familyName, regionName, mobRefs.size());
|
||||
} catch (RuntimeException exception) {
|
||||
throw new IOException("Failed to get list of referenced hfiles for request " + request,
|
||||
exception);
|
||||
}
|
||||
LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "+
|
||||
"Total referenced MOB files: {}", tableName, familyName, regionName, mobFiles.size());
|
||||
}
|
||||
}
|
||||
|
||||
return compact(request, scannerFactory, writerFactory, throughputController, user);
|
||||
}
|
||||
|
||||
private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
|
||||
/**
|
||||
* @param mobRefs multimap of original table name -> mob hfile
|
||||
*/
|
||||
private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException {
|
||||
FileSystem fs = store.getFileSystem();
|
||||
HashMap<String, Long> map = mobLengthMap.get();
|
||||
map.clear();
|
||||
for (Path p : mobFiles) {
|
||||
if (MobFileName.isOldMobFileName(p.getName())) {
|
||||
for (Entry<TableName, String> reference : mobRefs.entries()) {
|
||||
final TableName table = reference.getKey();
|
||||
final String mobfile = reference.getValue();
|
||||
if (MobFileName.isOldMobFileName(mobfile)) {
|
||||
disableIO.set(Boolean.TRUE);
|
||||
}
|
||||
FileStatus st = fs.getFileStatus(p);
|
||||
long size = st.getLen();
|
||||
LOG.debug("Referenced MOB file={} size={}", p, size);
|
||||
map.put(p.getName(), fs.getFileStatus(p).getLen());
|
||||
List<Path> locations = mobStore.getLocations(table);
|
||||
for (Path p : locations) {
|
||||
try {
|
||||
FileStatus st = fs.getFileStatus(new Path(p, mobfile));
|
||||
long size = st.getLen();
|
||||
LOG.debug("Referenced MOB file={} size={}", mobfile, size);
|
||||
map.put(mobfile, size);
|
||||
break;
|
||||
} catch (FileNotFoundException exception) {
|
||||
LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile,
|
||||
p);
|
||||
}
|
||||
}
|
||||
if (!map.containsKey(mobfile)) {
|
||||
throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of " +
|
||||
"expected locations: " + locations);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -391,8 +428,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
// We leave large MOB file as is (is not compacted),
|
||||
// then we update set of MOB file references
|
||||
// and append mob cell directly to the store's writer
|
||||
mobRefSet.get().add(fName);
|
||||
writer.append(mobCell);
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), fName);
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException(String.format("MOB cell did not contain a tablename " +
|
||||
"tag. should not be possible. see ref guide on mob troubleshooting. " +
|
||||
"store={} cell={}", getStoreInfo(), c));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -440,9 +484,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
if (MobUtils.hasValidMobRefCellValue(c)) {
|
||||
// We do not check mobSizeThreshold during normal compaction,
|
||||
// leaving it to a MOB compaction run
|
||||
writer.append(c);
|
||||
// Add MOB reference to a MOB reference set
|
||||
mobRefSet.get().add(MobUtils.getMobFileName(c));
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException(String.format("MOB cell did not contain a tablename " +
|
||||
"tag. should not be possible. see ref guide on mob troubleshooting. " +
|
||||
"store={} cell={}", getStoreInfo(), c));
|
||||
}
|
||||
} else {
|
||||
String errMsg = String.format("Corrupted MOB reference: %s", c.toString());
|
||||
throw new IOException(errMsg);
|
||||
@ -529,7 +579,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
throughputController.finish(compactionName);
|
||||
if (!finished && mobFileWriter != null) {
|
||||
// Remove all MOB references because compaction failed
|
||||
mobRefSet.get().clear();
|
||||
clearThreadLocals();
|
||||
// Abort writer
|
||||
LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
|
||||
mobFileWriter.getPath(), getStoreInfo());
|
||||
@ -547,16 +597,13 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
return true;
|
||||
}
|
||||
|
||||
private String getStoreInfo() {
|
||||
protected String getStoreInfo() {
|
||||
return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(),
|
||||
store.getColumnFamilyName(), store.getRegionInfo().getEncodedName()) ;
|
||||
}
|
||||
|
||||
private void clearThreadLocals() {
|
||||
Set<String> set = mobRefSet.get();
|
||||
if (set != null) {
|
||||
set.clear();
|
||||
}
|
||||
mobRefSet.get().clear();
|
||||
HashMap<String, Long> map = mobLengthMap.get();
|
||||
if (map != null) {
|
||||
map.clear();
|
||||
@ -571,7 +618,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
|
||||
getStoreInfo());
|
||||
// Add reference we get for compact MOB
|
||||
mobRefSet.get().add(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
return mobFileWriter;
|
||||
} catch (IOException e) {
|
||||
// Bailing out
|
||||
@ -604,7 +651,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
LOG.debug("Aborting writer for {} because there are no MOB cells, store={}",
|
||||
mobFileWriter.getPath(), getStoreInfo());
|
||||
// Remove MOB file from reference set
|
||||
mobRefSet.get().remove(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
abortWriter(mobFileWriter);
|
||||
}
|
||||
} else {
|
||||
@ -619,9 +666,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
||||
CompactionRequestImpl request) throws IOException {
|
||||
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
||||
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||
// Append MOB references
|
||||
Set<String> refSet = mobRefSet.get();
|
||||
writer.appendMobMetadata(refSet);
|
||||
writer.appendMobMetadata(mobRefSet.get());
|
||||
writer.close();
|
||||
clearThreadLocals();
|
||||
return newFiles;
|
||||
|
@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
@ -48,6 +49,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||
|
||||
/**
|
||||
* An implementation of the StoreFlusher. It extends the DefaultStoreFlusher.
|
||||
* If the store is not a mob store, the flusher flushes the MemStore the same with
|
||||
@ -280,7 +283,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||
// The hfile is current up to and including cacheFlushSeqNum.
|
||||
status.setStatus("Flushing " + store + ": appending metadata");
|
||||
writer.appendMetadata(cacheFlushSeqNum, false);
|
||||
writer.appendMobMetadata(mobRefSet.get());
|
||||
writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String>builder()
|
||||
.putAll(store.getTableName(), mobRefSet.get()).build());
|
||||
status.setStatus("Flushing " + store + ": closing flushed file");
|
||||
writer.close();
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mob;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -54,6 +53,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
/**
|
||||
* The class MobFileCleanerChore for running cleaner regularly to remove the expired
|
||||
@ -212,28 +212,28 @@ public class MobFileCleanerChore extends ScheduledChore {
|
||||
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
|
||||
// close store file to avoid memory leaks
|
||||
sf.closeStoreFile(true);
|
||||
if (mobRefData == null && bulkloadMarkerData == null) {
|
||||
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
|
||||
+ "can not proceed until all old files will be MOB-compacted.",
|
||||
pp);
|
||||
return;
|
||||
} else if (mobRefData == null && bulkloadMarkerData != null) {
|
||||
LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
|
||||
continue;
|
||||
}
|
||||
// mobRefData will never be null here, but to make FindBugs happy
|
||||
if (mobRefData != null && mobRefData.length > 1) {
|
||||
// if length = 1 means NULL, that there are no MOB references
|
||||
// in this store file, but the file was created by new MOB code
|
||||
String[] mobs = new String(mobRefData).split(",");
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found: {} mob references: {}", mobs.length, Arrays.toString(mobs));
|
||||
if (mobRefData == null) {
|
||||
if (bulkloadMarkerData == null) {
|
||||
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
|
||||
+ "can not proceed until all old files will be MOB-compacted.",
|
||||
pp);
|
||||
return;
|
||||
} else {
|
||||
LOG.debug("Found: {} mob references", mobs.length);
|
||||
LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
|
||||
continue;
|
||||
}
|
||||
regionMobs.addAll(Arrays.asList(mobs));
|
||||
} else {
|
||||
LOG.debug("File {} does not have mob references", currentPath);
|
||||
}
|
||||
// file may or may not have MOB references, but was created by the distributed
|
||||
// mob compaction code.
|
||||
try {
|
||||
SetMultimap<TableName, String> mobs = MobUtils.deserializeMobFileRefs(mobRefData)
|
||||
.build();
|
||||
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
|
||||
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
|
||||
regionMobs.addAll(mobs.values());
|
||||
} catch (RuntimeException exception) {
|
||||
throw new IOException("failure getting mob references for hfile " + sf,
|
||||
exception);
|
||||
}
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
|
@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.mob;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -68,6 +67,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
/**
|
||||
* The mob utilities
|
||||
@ -130,14 +131,51 @@ public final class MobUtils {
|
||||
* @param cell The current cell.
|
||||
* @return The table name tag.
|
||||
*/
|
||||
public static Tag getTableNameTag(Cell cell) {
|
||||
private static Optional<Tag> getTableNameTag(Cell cell) {
|
||||
Optional<Tag> tag = Optional.empty();
|
||||
if (cell.getTagsLength() > 0) {
|
||||
Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
|
||||
if (tag.isPresent()) {
|
||||
return tag.get();
|
||||
tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
|
||||
}
|
||||
return tag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the table name from when this cell was written into a mob hfile as a string.
|
||||
* @param cell to extract tag from
|
||||
* @return table name as a string. empty if the tag is not found.
|
||||
*/
|
||||
public static Optional<String> getTableNameString(Cell cell) {
|
||||
Optional<Tag> tag = getTableNameTag(cell);
|
||||
Optional<String> name = Optional.empty();
|
||||
if (tag.isPresent()) {
|
||||
name = Optional.of(Tag.getValueAsString(tag.get()));
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the table name from when this cell was written into a mob hfile as a TableName.
|
||||
* @param cell to extract tag from
|
||||
* @return name of table as a TableName. empty if the tag is not found.
|
||||
*/
|
||||
public static Optional<TableName> getTableName(Cell cell) {
|
||||
Optional<Tag> maybe = getTableNameTag(cell);
|
||||
Optional<TableName> name = Optional.empty();
|
||||
if (maybe.isPresent()) {
|
||||
final Tag tag = maybe.get();
|
||||
if (tag.hasArray()) {
|
||||
name = Optional.of(TableName.valueOf(tag.getValueArray(), tag.getValueOffset(),
|
||||
tag.getValueLength()));
|
||||
} else {
|
||||
// TODO ByteBuffer handling in tags looks busted. revisit.
|
||||
ByteBuffer buffer = tag.getValueByteBuffer().duplicate();
|
||||
buffer.mark();
|
||||
buffer.position(tag.getValueOffset());
|
||||
buffer.limit(tag.getValueOffset() + tag.getValueLength());
|
||||
name = Optional.of(TableName.valueOf(buffer));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -383,7 +421,7 @@ public final class MobUtils {
|
||||
|
||||
/**
|
||||
* Gets the RegionInfo of the mob files. This is a dummy region. The mob files are not saved in a
|
||||
* region in HBase. This is only used in mob snapshot. It's internally used only.
|
||||
* region in HBase. It's internally used only.
|
||||
* @param tableName
|
||||
* @return A dummy mob region info.
|
||||
*/
|
||||
@ -665,27 +703,78 @@ public final class MobUtils {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get list of referenced MOB files from a given collection of store files
|
||||
* @param storeFiles store files
|
||||
* @param mobDir MOB file directory
|
||||
* @return list of MOB file paths
|
||||
* Serialize a set of referenced mob hfiles
|
||||
* @param mobRefSet to serialize, may be null
|
||||
* @return byte array to i.e. put into store file metadata. will not be null
|
||||
*/
|
||||
public static byte[] serializeMobFileRefs(SetMultimap<TableName, String> mobRefSet) {
|
||||
if (mobRefSet != null && mobRefSet.size() > 0) {
|
||||
// Here we rely on the fact that '/' and ',' are not allowed in either table names nor hfile
|
||||
// names for serialization.
|
||||
//
|
||||
// exampleTable/filename1,filename2//example:table/filename5//otherTable/filename3,filename4
|
||||
//
|
||||
// to approximate the needed capacity we use the fact that there will usually be 1 table name
|
||||
// and each mob filename is around 105 bytes. we pick an arbitrary number to cover "most"
|
||||
// single table name lengths
|
||||
StringBuilder sb = new StringBuilder(100 + mobRefSet.size() * 105);
|
||||
boolean doubleSlash = false;
|
||||
for (TableName tableName : mobRefSet.keySet()) {
|
||||
sb.append(tableName).append("/");
|
||||
boolean comma = false;
|
||||
for (String refs : mobRefSet.get(tableName)) {
|
||||
sb.append(refs);
|
||||
if (comma) {
|
||||
sb.append(",");
|
||||
} else {
|
||||
comma = true;
|
||||
}
|
||||
}
|
||||
if (doubleSlash) {
|
||||
sb.append("//");
|
||||
} else {
|
||||
doubleSlash = true;
|
||||
}
|
||||
}
|
||||
return Bytes.toBytes(sb.toString());
|
||||
} else {
|
||||
return HStoreFile.NULL_VALUE;
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Path> getReferencedMobFiles(Collection<HStoreFile> storeFiles, Path mobDir) {
|
||||
|
||||
Set<String> mobSet = new HashSet<String>();
|
||||
for (HStoreFile sf : storeFiles) {
|
||||
byte[] value = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
||||
if (value != null && value.length > 1) {
|
||||
String s = Bytes.toString(value);
|
||||
String[] all = s.split(",");
|
||||
Collections.addAll(mobSet, all);
|
||||
/**
|
||||
* Deserialize the set of referenced mob hfiles from store file metadata.
|
||||
* @param bytes compatibly serialized data. can not be null
|
||||
* @return a setmultimap of original table to list of hfile names. will be empty if no values.
|
||||
* @throws IllegalStateException if there are values but no table name
|
||||
*/
|
||||
public static ImmutableSetMultimap.Builder<TableName, String> deserializeMobFileRefs(byte[] bytes)
|
||||
throws IllegalStateException {
|
||||
ImmutableSetMultimap.Builder<TableName, String> map = ImmutableSetMultimap.builder();
|
||||
if (bytes.length > 1) {
|
||||
// TODO avoid turning the tablename pieces in to strings.
|
||||
String s = Bytes.toString(bytes);
|
||||
String[] tables = s.split("//");
|
||||
for (String tableEnc : tables) {
|
||||
final int delim = tableEnc.indexOf('/');
|
||||
if (delim <= 0) {
|
||||
throw new IllegalStateException("MOB reference data does not match expected encoding: " +
|
||||
"no table name included before list of mob refs.");
|
||||
}
|
||||
TableName table = TableName.valueOf(tableEnc.substring(0, delim));
|
||||
String[] refs = tableEnc.substring(delim + 1).split(",");
|
||||
map.putAll(table, refs);
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// array length 1 should be the NULL_VALUE.
|
||||
if (! Arrays.equals(HStoreFile.NULL_VALUE, bytes)) {
|
||||
LOG.debug("Serialized MOB file refs array was treated as the placeholder 'no entries' but"
|
||||
+ " didn't have the expected placeholder byte. expected={} and actual={}",
|
||||
Arrays.toString(HStoreFile.NULL_VALUE), Arrays.toString(bytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
List<Path> retList = new ArrayList<Path>();
|
||||
for (String name : mobSet) {
|
||||
retList.add(new Path(mobDir, name));
|
||||
}
|
||||
return retList;
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@ -55,7 +56,6 @@ import org.apache.hadoop.hbase.mob.MobFileCache;
|
||||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobStoreEngine;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
@ -94,7 +94,7 @@ public class HMobStore extends HStore {
|
||||
private AtomicLong mobScanCellsCount = new AtomicLong();
|
||||
private AtomicLong mobScanCellsSize = new AtomicLong();
|
||||
private ColumnFamilyDescriptor family;
|
||||
private Map<String, List<Path>> map = new ConcurrentHashMap<>();
|
||||
private Map<TableName, List<Path>> map = new ConcurrentHashMap<>();
|
||||
private final IdLock keyLock = new IdLock();
|
||||
// When we add a MOB reference cell to the HFile, we will add 2 tags along with it
|
||||
// 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote this this cell is not
|
||||
@ -117,7 +117,7 @@ public class HMobStore extends HStore {
|
||||
TableName tn = region.getTableDescriptor().getTableName();
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn, MobUtils.getMobRegionInfo(tn)
|
||||
.getEncodedName(), family.getNameAsString()));
|
||||
map.put(Bytes.toString(tn.getName()), locations);
|
||||
map.put(tn, locations);
|
||||
List<Tag> tags = new ArrayList<>(2);
|
||||
tags.add(MobConstants.MOB_REF_TAG);
|
||||
Tag tableNameTag = new ArrayBackedTag(TagType.MOB_TABLE_NAME_TAG_TYPE,
|
||||
@ -315,26 +315,9 @@ public class HMobStore extends HStore {
|
||||
MobCell mobCell = null;
|
||||
if (MobUtils.hasValidMobRefCellValue(reference)) {
|
||||
String fileName = MobUtils.getMobFileName(reference);
|
||||
Tag tableNameTag = MobUtils.getTableNameTag(reference);
|
||||
if (tableNameTag != null) {
|
||||
String tableNameString = Tag.getValueAsString(tableNameTag);
|
||||
List<Path> locations = map.get(tableNameString);
|
||||
if (locations == null) {
|
||||
IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
|
||||
try {
|
||||
locations = map.get(tableNameString);
|
||||
if (locations == null) {
|
||||
locations = new ArrayList<>(2);
|
||||
TableName tn = TableName.valueOf(tableNameString);
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tn, family.getNameAsString()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
|
||||
MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
|
||||
map.put(tableNameString, locations);
|
||||
}
|
||||
} finally {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
Optional<TableName> tableName = MobUtils.getTableName(reference);
|
||||
if (tableName.isPresent()) {
|
||||
List<Path> locations = getLocations(tableName.get());
|
||||
mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
|
||||
readEmptyValueOnMobCellMiss);
|
||||
}
|
||||
@ -357,6 +340,30 @@ public class HMobStore extends HStore {
|
||||
return mobCell;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tableName to look up locations for, can not be null
|
||||
* @return a list of location in order of working dir, archive dir. will not be null.
|
||||
*/
|
||||
public List<Path> getLocations(TableName tableName) throws IOException {
|
||||
List<Path> locations = map.get(tableName);
|
||||
if (locations == null) {
|
||||
IdLock.Entry lockEntry = keyLock.getLockEntry(tableName.hashCode());
|
||||
try {
|
||||
locations = map.get(tableName);
|
||||
if (locations == null) {
|
||||
locations = new ArrayList<>(2);
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tableName, family.getNameAsString()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tableName,
|
||||
MobUtils.getMobRegionInfo(tableName).getEncodedName(), family.getNameAsString()));
|
||||
map.put(tableName, locations);
|
||||
}
|
||||
} finally {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
return locations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the cell from a mob file.
|
||||
* The mob file might be located in different directories.
|
||||
|
@ -297,8 +297,7 @@ public class HStoreFile implements StoreFile {
|
||||
}
|
||||
|
||||
/**
|
||||
* Only used by the Striped Compaction Policy
|
||||
* @param key
|
||||
* @param key to look up
|
||||
* @return value associated with the metadata key
|
||||
*/
|
||||
public byte[] getMetadataValue(byte[] key) {
|
||||
|
@ -26,7 +26,6 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_K
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.NULL_VALUE;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -39,7 +38,6 @@ import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -47,11 +45,13 @@ import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.util.BloomContext;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterUtil;
|
||||
@ -65,6 +65,7 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
/**
|
||||
@ -248,17 +249,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
||||
|
||||
/**
|
||||
* Appends MOB - specific metadata (even if it is empty)
|
||||
* @param mobRefSet - set of MOB file names
|
||||
* @param mobRefSet - original table -> set of MOB file names
|
||||
* @throws IOException problem writing to FS
|
||||
*/
|
||||
public void appendMobMetadata(Set<String> mobRefSet) throws IOException {
|
||||
if (mobRefSet != null && mobRefSet.size() > 0) {
|
||||
String sb = StringUtils.join(mobRefSet, ",");
|
||||
byte[] bytes = Bytes.toBytes(sb.toString());
|
||||
writer.appendFileInfo(MOB_FILE_REFS, bytes);
|
||||
} else {
|
||||
writer.appendFileInfo(MOB_FILE_REFS, NULL_VALUE);
|
||||
}
|
||||
public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
|
||||
writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ -33,6 +34,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
@ -164,7 +166,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||
// Add the only reference we get for compact MOB case
|
||||
// because new store file will have only one MOB reference
|
||||
// in this case - of newly compacted MOB file
|
||||
mobRefSet.get().add(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
}
|
||||
do {
|
||||
hasMore = scanner.next(cells, scannerContext);
|
||||
@ -237,9 +239,15 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||
if (size > mobSizeThreshold) {
|
||||
// If the value size is larger than the threshold, it's regarded as a mob. Since
|
||||
// its value is already in the mob file, directly write this cell to the store file
|
||||
writer.append(c);
|
||||
// Add MOB reference to a set
|
||||
mobRefSet.get().add(MobUtils.getMobFileName(c));
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException(String.format("MOB cell did not contain a tablename " +
|
||||
"tag. should not be possible. see ref guide on mob troubleshooting. " +
|
||||
"store={} cell={}", getStoreInfo(), c));
|
||||
}
|
||||
} else {
|
||||
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
|
||||
// the mob cell from the mob file, and write it back to the store file.
|
||||
@ -255,9 +263,15 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||
// directly write the cell to the store file, and leave it to be handled by the
|
||||
// next compaction.
|
||||
LOG.error("Empty value for: " + c);
|
||||
writer.append(c);
|
||||
// Add MOB reference to a set
|
||||
mobRefSet.get().add(MobUtils.getMobFileName(c));
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException(String.format("MOB cell did not contain a tablename " +
|
||||
"tag. should not be possible. see ref guide on mob troubleshooting. " +
|
||||
"store={} cell={}", getStoreInfo(), c));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -280,7 +294,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
||||
cellsCountCompactedToMob++;
|
||||
cellsSizeCompactedToMob += c.getValueLength();
|
||||
// Add ref we get for compact MOB case
|
||||
mobRefSet.get().add(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
}
|
||||
|
||||
int len = c.getSerializedSize();
|
||||
|
@ -1,232 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.CompactionState;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Mob file compaction base test.
|
||||
* 1. Enables batch mode for regular MOB compaction,
|
||||
* Sets batch size to 7 regions. (Optional)
|
||||
* 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
|
||||
* 3. Creates MOB table with 20 regions
|
||||
* 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
|
||||
* 5. Repeats 4. two more times
|
||||
* 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
|
||||
* 7. Runs major MOB compaction.
|
||||
* 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
|
||||
* 9. Waits for a period of time larger than minimum age to archive
|
||||
* 10. Runs Mob cleaner chore
|
||||
* 11 Verifies that number of MOB files in a mob directory is 20.
|
||||
* 12 Runs scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public abstract class TestMobCompactionBase {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionBase.class);
|
||||
|
||||
protected HBaseTestingUtility HTU;
|
||||
|
||||
protected final static String famStr = "f1";
|
||||
protected final static byte[] fam = Bytes.toBytes(famStr);
|
||||
protected final static byte[] qualifier = Bytes.toBytes("q1");
|
||||
protected final static long mobLen = 10;
|
||||
protected final static byte[] mobVal = Bytes
|
||||
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
|
||||
|
||||
protected Configuration conf;
|
||||
protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
|
||||
private ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
|
||||
protected Admin admin;
|
||||
protected Table table = null;
|
||||
protected long minAgeToArchive = 10000;
|
||||
protected int numRegions = 20;
|
||||
protected int rows = 1000;
|
||||
|
||||
protected MobFileCleanerChore cleanerChore;
|
||||
|
||||
public TestMobCompactionBase() {
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
HTU = new HBaseTestingUtility();
|
||||
tableDescriptor = HTU.createModifyableTableDescriptor(getClass().getName());
|
||||
conf = HTU.getConfiguration();
|
||||
|
||||
initConf();
|
||||
|
||||
HTU.startMiniCluster();
|
||||
admin = HTU.getAdmin();
|
||||
cleanerChore = new MobFileCleanerChore();
|
||||
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
|
||||
familyDescriptor.setMobEnabled(true);
|
||||
familyDescriptor.setMobThreshold(mobLen);
|
||||
familyDescriptor.setMaxVersions(1);
|
||||
tableDescriptor.setColumnFamily(familyDescriptor);
|
||||
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
|
||||
byte[][] splitKeys = splitAlgo.split(numRegions);
|
||||
table = HTU.createTable(tableDescriptor, splitKeys);
|
||||
|
||||
}
|
||||
|
||||
protected void initConf() {
|
||||
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
// Disable automatic MOB compaction
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
|
||||
// Disable automatic MOB file cleaner chore
|
||||
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
|
||||
// Set minimum age to archive to 10 sec
|
||||
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
|
||||
// Set compacted file discharger interval to a half minAgeToArchive
|
||||
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive/2);
|
||||
}
|
||||
|
||||
private void loadData(int num) {
|
||||
|
||||
Random r = new Random();
|
||||
try {
|
||||
LOG.info("Started loading {} rows", num);
|
||||
for (int i = 0; i < num; i++) {
|
||||
byte[] key = new byte[32];
|
||||
r.nextBytes(key);
|
||||
Put p = new Put(key);
|
||||
p.addColumn(fam, qualifier, mobVal);
|
||||
table.put(p);
|
||||
}
|
||||
admin.flush(table.getName());
|
||||
LOG.info("Finished loading {} rows", num);
|
||||
} catch (Exception e) {
|
||||
LOG.error("MOB file compaction chore test FAILED", e);
|
||||
fail("MOB file compaction chore test FAILED");
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
admin.disableTable(tableDescriptor.getTableName());
|
||||
admin.deleteTable(tableDescriptor.getTableName());
|
||||
HTU.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
||||
public void baseTestMobFileCompaction() throws InterruptedException, IOException {
|
||||
|
||||
// Load and flush data 3 times
|
||||
loadData(rows);
|
||||
loadData(rows);
|
||||
loadData(rows);
|
||||
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||
assertEquals(numRegions * 3, num);
|
||||
// Major MOB compact
|
||||
mobCompact(admin, tableDescriptor, familyDescriptor);
|
||||
// wait until compaction is complete
|
||||
while (admin.getCompactionState(tableDescriptor.getTableName()) != CompactionState.NONE) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||
assertEquals(numRegions * 4, num);
|
||||
// We have guarantee, that compacted file discharger will run during this pause
|
||||
// because it has interval less than this wait time
|
||||
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
||||
|
||||
Thread.sleep(minAgeToArchive + 1000);
|
||||
LOG.info("Cleaning up MOB files");
|
||||
// Cleanup again
|
||||
cleanerChore.cleanupObsoleteMobFiles(conf, table.getName());
|
||||
|
||||
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
|
||||
assertEquals(numRegions, num);
|
||||
|
||||
long scanned = scanTable();
|
||||
assertEquals(3 * rows, scanned);
|
||||
|
||||
}
|
||||
|
||||
protected abstract void mobCompact(Admin admin2, TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException;
|
||||
|
||||
|
||||
protected long getNumberOfMobFiles(Configuration conf, TableName tableName, String family)
|
||||
throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
|
||||
FileStatus[] stat = fs.listStatus(dir);
|
||||
for (FileStatus st : stat) {
|
||||
LOG.debug("MOB Directory content: {}", st.getPath());
|
||||
}
|
||||
LOG.debug("MOB Directory content total files: {}", stat.length);
|
||||
|
||||
return stat.length;
|
||||
}
|
||||
|
||||
|
||||
protected long scanTable() {
|
||||
try {
|
||||
|
||||
Result result;
|
||||
ResultScanner scanner = table.getScanner(fam);
|
||||
long counter = 0;
|
||||
while ((result = scanner.next()) != null) {
|
||||
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
|
||||
counter++;
|
||||
}
|
||||
return counter;
|
||||
} catch (Exception e) {
|
||||
LOG.error("MOB file compaction test FAILED", e);
|
||||
if (HTU != null) {
|
||||
fail(e.getMessage());
|
||||
} else {
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
@ -20,12 +20,9 @@ package org.apache.hadoop.hbase.mob;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -47,37 +44,25 @@ import org.slf4j.LoggerFactory;
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionOptMode extends TestMobCompactionBase{
|
||||
public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionOptMode.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
|
||||
|
||||
public TestMobCompactionOptMode() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initConf() {
|
||||
super.initConf();
|
||||
@BeforeClass
|
||||
public static void configureOptimizedCompaction() throws InterruptedException, IOException {
|
||||
HTU.shutdownMiniHBaseCluster();
|
||||
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY,
|
||||
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction generational (non-batch) mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction generational (non-batch) mode finished OK");
|
||||
|
||||
HTU.startMiniHBaseCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
// Major compact MOB table
|
||||
admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
|
||||
protected String description() {
|
||||
return "generational (non-batch) mode";
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.mob;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -49,46 +48,43 @@ import org.slf4j.LoggerFactory;
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionBase{
|
||||
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionOptRegionBatchMode.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionOptRegionBatchMode.class);
|
||||
|
||||
private int batchSize = 7;
|
||||
private static final int batchSize = 7;
|
||||
private MobFileCompactionChore compactionChore;
|
||||
|
||||
public TestMobCompactionOptRegionBatchMode() {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
||||
}
|
||||
|
||||
protected void initConf() {
|
||||
super.initConf();
|
||||
@BeforeClass
|
||||
public static void configureOptimizedCompactionAndBatches()
|
||||
throws InterruptedException, IOException {
|
||||
HTU.shutdownMiniHBaseCluster();
|
||||
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
||||
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY,
|
||||
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
||||
HTU.startMiniHBaseCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
|
||||
protected void mobCompactImpl(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
// Major compact with batch mode enabled
|
||||
LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
|
||||
compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, familyDescriptor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction chore generational batch mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction chore generational batch mode finished OK");
|
||||
|
||||
@Override
|
||||
protected String description() {
|
||||
return "generational batch mode";
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,75 +0,0 @@
|
||||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Mob file compaction chore in a regular non-batch mode test.
|
||||
* 1. Uses default (non-batch) mode for regular MOB compaction,
|
||||
* 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
|
||||
* 3. Creates MOB table with 20 regions
|
||||
* 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
|
||||
* 5. Repeats 4. two more times
|
||||
* 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
|
||||
* 7. Runs major MOB compaction.
|
||||
* 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
|
||||
* 9. Waits for a period of time larger than minimum age to archive
|
||||
* 10. Runs Mob cleaner chore
|
||||
* 11 Verifies that number of MOB files in a mob directory is 20.
|
||||
* 12 Runs scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionRegularMode extends TestMobCompactionBase{
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionRegularMode.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionRegularMode.class);
|
||||
|
||||
public TestMobCompactionRegularMode() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
// Major compact MOB table
|
||||
admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction regular mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction regular mode finished OK");
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -20,13 +20,12 @@ package org.apache.hadoop.hbase.mob;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -49,43 +48,39 @@ import org.slf4j.LoggerFactory;
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionBase{
|
||||
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionRegularRegionBatchMode.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionRegularRegionBatchMode.class);
|
||||
|
||||
private int batchSize = 7;
|
||||
private static final int batchSize = 7;
|
||||
private MobFileCompactionChore compactionChore;
|
||||
|
||||
public TestMobCompactionRegularRegionBatchMode() {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
||||
}
|
||||
|
||||
protected void initConf() {
|
||||
super.initConf();
|
||||
@BeforeClass
|
||||
public static void configureCompactionBatches() throws InterruptedException, IOException {
|
||||
HTU.shutdownMiniHBaseCluster();
|
||||
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
||||
HTU.startMiniHBaseCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, TableDescriptor tableDescriptor,
|
||||
protected void mobCompactImpl(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
// Major compact with batch mode enabled
|
||||
LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
|
||||
compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, familyDescriptor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction chore regular batch mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction chore regular batch mode finished OK");
|
||||
|
||||
@Override
|
||||
protected String description() {
|
||||
return "regular batch mode";
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,335 @@
|
||||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.CompactionState;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Mob file compaction base test.
|
||||
* 1. Enables batch mode for regular MOB compaction,
|
||||
* Sets batch size to 7 regions. (Optional)
|
||||
* 2. Disables periodic MOB compactions, sets minimum age to archive to 10 sec
|
||||
* 3. Creates MOB table with 20 regions
|
||||
* 4. Loads MOB data (randomized keys, 1000 rows), flushes data.
|
||||
* 5. Repeats 4. two more times
|
||||
* 6. Verifies that we have 20 *3 = 60 mob files (equals to number of regions x 3)
|
||||
* 7. Runs major MOB compaction.
|
||||
* 8. Verifies that number of MOB files in a mob directory is 20 x4 = 80
|
||||
* 9. Waits for a period of time larger than minimum age to archive
|
||||
* 10. Runs Mob cleaner chore
|
||||
* 11 Verifies that number of MOB files in a mob directory is 20.
|
||||
* 12 Runs scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
|
||||
|
||||
protected static HBaseTestingUtility HTU;
|
||||
protected static Configuration conf;
|
||||
protected static long minAgeToArchive = 10000;
|
||||
|
||||
protected final static String famStr = "f1";
|
||||
protected final static byte[] fam = Bytes.toBytes(famStr);
|
||||
protected final static byte[] qualifier = Bytes.toBytes("q1");
|
||||
protected final static long mobLen = 10;
|
||||
protected final static byte[] mobVal = Bytes
|
||||
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
|
||||
|
||||
@Rule
|
||||
public TestName test = new TestName();
|
||||
protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
|
||||
private ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
|
||||
protected Admin admin;
|
||||
protected TableName table = null;
|
||||
protected int numRegions = 20;
|
||||
protected int rows = 1000;
|
||||
|
||||
protected MobFileCleanerChore cleanerChore;
|
||||
|
||||
@BeforeClass
|
||||
public static void htuStart() throws Exception {
|
||||
HTU = new HBaseTestingUtility();
|
||||
conf = HTU.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
// Disable automatic MOB compaction
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
|
||||
// Disable automatic MOB file cleaner chore
|
||||
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
|
||||
// Set minimum age to archive to 10 sec
|
||||
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
|
||||
// Set compacted file discharger interval to a half minAgeToArchive
|
||||
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive/2);
|
||||
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
|
||||
HTU.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void htuStop() throws Exception {
|
||||
HTU.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName());
|
||||
admin = HTU.getAdmin();
|
||||
cleanerChore = new MobFileCleanerChore();
|
||||
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
|
||||
familyDescriptor.setMobEnabled(true);
|
||||
familyDescriptor.setMobThreshold(mobLen);
|
||||
familyDescriptor.setMaxVersions(1);
|
||||
tableDescriptor.setColumnFamily(familyDescriptor);
|
||||
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
|
||||
byte[][] splitKeys = splitAlgo.split(numRegions);
|
||||
table = HTU.createTable(tableDescriptor, splitKeys).getName();
|
||||
}
|
||||
|
||||
private void loadData(TableName tableName, int num) {
|
||||
|
||||
Random r = new Random();
|
||||
LOG.info("Started loading {} rows into {}", num, tableName);
|
||||
try (final Table table = HTU.getConnection().getTable(tableName)) {
|
||||
for (int i = 0; i < num; i++) {
|
||||
byte[] key = new byte[32];
|
||||
r.nextBytes(key);
|
||||
Put p = new Put(key);
|
||||
p.addColumn(fam, qualifier, mobVal);
|
||||
table.put(p);
|
||||
}
|
||||
admin.flush(tableName);
|
||||
LOG.info("Finished loading {} rows into {}", num, tableName);
|
||||
} catch (Exception e) {
|
||||
LOG.error("MOB file compaction chore test FAILED", e);
|
||||
fail("MOB file compaction chore test FAILED");
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
admin.disableTable(tableDescriptor.getTableName());
|
||||
admin.deleteTable(tableDescriptor.getTableName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void baseTestMobFileCompaction() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction " + description() + " started");
|
||||
loadAndFlushThreeTimes(rows, table, famStr);
|
||||
mobCompact(tableDescriptor, familyDescriptor);
|
||||
assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
|
||||
getNumberOfMobFiles(table, famStr));
|
||||
cleanupAndVerifyCounts(table, famStr, 3*rows);
|
||||
LOG.info("MOB compaction " + description() + " finished OK");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
|
||||
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
|
||||
LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
|
||||
loadAndFlushThreeTimes(rows, table, famStr);
|
||||
LOG.debug("Taking snapshot and cloning table {}", table);
|
||||
admin.snapshot(test.getMethodName(), table);
|
||||
admin.cloneSnapshot(test.getMethodName(), clone);
|
||||
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
||||
getNumberOfMobFiles(clone, famStr));
|
||||
mobCompact(admin.getDescriptor(clone), familyDescriptor);
|
||||
assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
|
||||
4 * numRegions, getNumberOfMobFiles(clone, famStr));
|
||||
cleanupAndVerifyCounts(clone, famStr, 3*rows);
|
||||
LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionAfterSnapshotCloneAndFlush() throws InterruptedException,
|
||||
IOException {
|
||||
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
|
||||
LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
|
||||
loadAndFlushThreeTimes(rows, table, famStr);
|
||||
LOG.debug("Taking snapshot and cloning table {}", table);
|
||||
admin.snapshot(test.getMethodName(), table);
|
||||
admin.cloneSnapshot(test.getMethodName(), clone);
|
||||
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
||||
getNumberOfMobFiles(clone, famStr));
|
||||
loadAndFlushThreeTimes(rows, clone, famStr);
|
||||
mobCompact(admin.getDescriptor(clone), familyDescriptor);
|
||||
assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
|
||||
7 * numRegions, getNumberOfMobFiles(clone, famStr));
|
||||
cleanupAndVerifyCounts(clone, famStr, 6*rows);
|
||||
LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
|
||||
}
|
||||
|
||||
protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
|
||||
throws IOException {
|
||||
final long start = getNumberOfMobFiles(table, family);
|
||||
// Load and flush data 3 times
|
||||
loadData(table, rows);
|
||||
loadData(table, rows);
|
||||
loadData(table, rows);
|
||||
assertEquals("Should have 3 more mob files per region from flushing.", start + numRegions * 3,
|
||||
getNumberOfMobFiles(table, family));
|
||||
}
|
||||
|
||||
protected String description() {
|
||||
return "regular mode";
|
||||
}
|
||||
|
||||
protected void enableCompactions() throws IOException {
|
||||
final List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
|
||||
.collect(Collectors.toList());
|
||||
admin.compactionSwitch(true, serverList);
|
||||
}
|
||||
|
||||
protected void disableCompactions() throws IOException {
|
||||
final List<String> serverList = admin.getRegionServers().stream().map(sn -> sn.getServerName())
|
||||
.collect(Collectors.toList());
|
||||
admin.compactionSwitch(false, serverList);
|
||||
}
|
||||
|
||||
/**
|
||||
* compact the given table and return once it is done.
|
||||
* should presume compactions are disabled when called.
|
||||
* should ensure compactions are disabled before returning.
|
||||
*/
|
||||
protected void mobCompact(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
|
||||
enableCompactions();
|
||||
mobCompactImpl(tableDescriptor, familyDescriptor);
|
||||
waitUntilCompactionIsComplete(tableDescriptor.getTableName());
|
||||
disableCompactions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Call the API for compaction specific to the test set.
|
||||
* should not wait for compactions to finish.
|
||||
* may assume compactions are enabled when called.
|
||||
*/
|
||||
protected void mobCompactImpl(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
|
||||
}
|
||||
|
||||
protected void waitUntilCompactionIsComplete(TableName table)
|
||||
throws IOException, InterruptedException {
|
||||
CompactionState state = admin.getCompactionState(table);
|
||||
while (state != CompactionState.NONE) {
|
||||
LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
|
||||
Thread.sleep(100);
|
||||
state = admin.getCompactionState(table);
|
||||
}
|
||||
LOG.debug("done waiting for compaction on {}", table);
|
||||
}
|
||||
|
||||
protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
|
||||
throws InterruptedException, IOException {
|
||||
// We have guarantee, that compacted file discharger will run during this pause
|
||||
// because it has interval less than this wait time
|
||||
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
||||
|
||||
Thread.sleep(minAgeToArchive + 1000);
|
||||
LOG.info("Cleaning up MOB files");
|
||||
// Cleanup again
|
||||
cleanerChore.cleanupObsoleteMobFiles(conf, table);
|
||||
|
||||
assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
|
||||
getNumberOfMobFiles(table, family));
|
||||
|
||||
LOG.debug("checking count of rows");
|
||||
long scanned = scanTable(table);
|
||||
assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
|
||||
|
||||
}
|
||||
|
||||
protected long getNumberOfMobFiles(TableName tableName, String family)
|
||||
throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
|
||||
FileStatus[] stat = fs.listStatus(dir);
|
||||
for (FileStatus st : stat) {
|
||||
LOG.debug("MOB Directory content: {}", st.getPath());
|
||||
}
|
||||
LOG.debug("MOB Directory content total files: {}", stat.length);
|
||||
|
||||
return stat.length;
|
||||
}
|
||||
|
||||
|
||||
protected long scanTable(TableName tableName) {
|
||||
try (final Table table = HTU.getConnection().getTable(tableName);
|
||||
final ResultScanner scanner = table.getScanner(fam)) {
|
||||
Result result;
|
||||
long counter = 0;
|
||||
while ((result = scanner.next()) != null) {
|
||||
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
|
||||
counter++;
|
||||
}
|
||||
return counter;
|
||||
} catch (Exception e) {
|
||||
LOG.error("MOB file compaction test FAILED", e);
|
||||
if (HTU != null) {
|
||||
fail(e.getMessage());
|
||||
} else {
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user