* HBASE-23723 Ensure MOB compaction works in optimized mode after snapshot clone (#1446) * Reorganize MOB compaction tests for more reuse. * Add tests for mob compaction after snapshot clone operations * note the original table used to write a given mob hfile and use that to find it later. Signed-off-by: Esteban Gutierrez <esteban@apache.org> * spotless:apply to fix HBaseTestingUtility * Fix error-prone errors Signed-off-by: Esteban Gutierrez <esteban@apache.org> Co-authored-by: Sean Busbey <busbey@apache.org> Signed-off-by: Esteban Gutierrez <esteban@apache.org> Co-authored-by: Andrew Purtell <apurtell@apache.org> Co-authored-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
parent
3980e6d087
commit
e595f154d1
|
@ -916,7 +916,7 @@ public final class PrivateCellUtil {
|
|||
* Retrieve Cell's first tag, matching the passed in type
|
||||
* @param cell The Cell
|
||||
* @param type Type of the Tag to retrieve
|
||||
* @return null if there is no tag of the passed in tag type
|
||||
* @return Optional, empty if there is no tag of the passed in tag type
|
||||
*/
|
||||
public static Optional<Tag> getTag(Cell cell, byte type) {
|
||||
boolean bufferBacked = cell instanceof ByteBufferExtendedCell;
|
||||
|
|
|
@ -22,9 +22,12 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import org.apache.commons.lang3.ArrayUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Immutable POJO class for representing a table name. Which is of the form: <table
|
||||
* namespace>:<table qualifier> Two special namespaces: 1. hbase - system namespace, used
|
||||
|
@ -386,28 +389,69 @@ public final class TableName implements Comparable<TableName> {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param fullName will use the entire byte array
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta. Some code depends on
|
||||
* this. The test is buried in the table creation to save on
|
||||
* array comparison when we're creating a standard table object
|
||||
* that will be in the cache.
|
||||
*/
|
||||
public static TableName valueOf(byte[] fullName) throws IllegalArgumentException {
|
||||
return valueOf(fullName, 0, fullName.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fullName byte array to look into
|
||||
* @param offset within said array
|
||||
* @param length within said array
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta.
|
||||
*/
|
||||
public static TableName valueOf(byte[] fullName, int offset, int length)
|
||||
throws IllegalArgumentException {
|
||||
Preconditions.checkArgument(offset >= 0, "offset must be non-negative but was %s", offset);
|
||||
Preconditions.checkArgument(offset < fullName.length, "offset (%s) must be < array length (%s)",
|
||||
offset, fullName.length);
|
||||
Preconditions.checkArgument(length <= fullName.length,
|
||||
"length (%s) must be <= array length (%s)", length, fullName.length);
|
||||
for (TableName tn : tableCache) {
|
||||
if (Arrays.equals(tn.getName(), fullName)) {
|
||||
final byte[] tnName = tn.getName();
|
||||
if (Bytes.equals(tnName, 0, tnName.length, fullName, offset, length)) {
|
||||
return tn;
|
||||
}
|
||||
}
|
||||
|
||||
int namespaceDelimIndex = org.apache.hbase.thirdparty.com.google.common.primitives.Bytes
|
||||
.lastIndexOf(fullName, (byte) NAMESPACE_DELIM);
|
||||
int namespaceDelimIndex = ArrayUtils.lastIndexOf(fullName, (byte) NAMESPACE_DELIM);
|
||||
|
||||
if (namespaceDelimIndex < 0) {
|
||||
if (namespaceDelimIndex < offset) {
|
||||
return createTableNameIfNecessary(ByteBuffer.wrap(NamespaceDescriptor.DEFAULT_NAMESPACE_NAME),
|
||||
ByteBuffer.wrap(fullName));
|
||||
ByteBuffer.wrap(fullName, offset, length));
|
||||
} else {
|
||||
return createTableNameIfNecessary(ByteBuffer.wrap(fullName, 0, namespaceDelimIndex),
|
||||
ByteBuffer.wrap(fullName, namespaceDelimIndex + 1,
|
||||
fullName.length - (namespaceDelimIndex + 1)));
|
||||
return createTableNameIfNecessary(ByteBuffer.wrap(fullName, offset, namespaceDelimIndex),
|
||||
ByteBuffer.wrap(fullName, namespaceDelimIndex + 1, length - (namespaceDelimIndex + 1)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param fullname of a table, possibly with a leading namespace and ':' as delimiter.
|
||||
* @throws IllegalArgumentException if fullName equals old root or old meta.
|
||||
*/
|
||||
public static TableName valueOf(ByteBuffer fullname) {
|
||||
fullname = fullname.duplicate();
|
||||
fullname.mark();
|
||||
boolean miss = true;
|
||||
while (fullname.hasRemaining() && miss) {
|
||||
miss = ((byte) NAMESPACE_DELIM) != fullname.get();
|
||||
}
|
||||
if (miss) {
|
||||
fullname.reset();
|
||||
return valueOf(null, fullname);
|
||||
} else {
|
||||
ByteBuffer qualifier = fullname.slice();
|
||||
int delimiterIndex = fullname.position() - 1;
|
||||
fullname.reset();
|
||||
// changing variable name for clarity
|
||||
ByteBuffer namespace = fullname.duplicate();
|
||||
namespace.limit(delimiterIndex);
|
||||
return valueOf(namespace, qualifier);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.LinkedHashSet;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TimeZone;
|
||||
|
@ -422,17 +423,16 @@ public class HFilePrettyPrinter extends Configured implements Tool {
|
|||
}
|
||||
// check if mob files are missing.
|
||||
if (checkMobIntegrity && MobUtils.isMobReferenceCell(cell)) {
|
||||
Tag tnTag = MobUtils.getTableNameTag(cell);
|
||||
if (tnTag == null) {
|
||||
Optional<TableName> tn = MobUtils.getTableName(cell);
|
||||
if (!tn.isPresent()) {
|
||||
System.err.println(
|
||||
"ERROR, wrong tag format in mob reference cell " + CellUtil.getCellKeyAsString(cell));
|
||||
} else if (!MobUtils.hasValidMobRefCellValue(cell)) {
|
||||
System.err.println(
|
||||
"ERROR, wrong value format in mob reference cell " + CellUtil.getCellKeyAsString(cell));
|
||||
} else {
|
||||
TableName tn = TableName.valueOf(Tag.cloneValue(tnTag));
|
||||
String mobFileName = MobUtils.getMobFileName(cell);
|
||||
boolean exist = mobFileExists(fs, tn, mobFileName,
|
||||
boolean exist = mobFileExists(fs, tn.get(), mobFileName,
|
||||
Bytes.toString(CellUtil.cloneFamily(cell)), foundMobFiles, missingMobFiles);
|
||||
if (!exist) {
|
||||
// report error
|
||||
|
|
|
@ -26,9 +26,9 @@ import java.io.InterruptedIOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Consumer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -37,9 +37,11 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.HMobStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.HStoreFile;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||
|
@ -62,7 +64,10 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
/**
|
||||
* Compact passed set of files in the mob-enabled column family.
|
||||
|
@ -82,12 +87,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
* compaction process.
|
||||
*/
|
||||
|
||||
static ThreadLocal<Set<String>> mobRefSet = new ThreadLocal<Set<String>>() {
|
||||
@Override
|
||||
protected Set<String> initialValue() {
|
||||
return new HashSet<String>();
|
||||
}
|
||||
};
|
||||
static ThreadLocal<SetMultimap<TableName, String>> mobRefSet =
|
||||
ThreadLocal.withInitial(HashMultimap::create);
|
||||
|
||||
/*
|
||||
* Is it user or system-originated request.
|
||||
|
@ -192,34 +193,72 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
// Check if I/O optimized MOB compaction
|
||||
if (ioOptimizedMode) {
|
||||
if (request.isMajor() && request.getPriority() == HStore.PRIORITY_USER) {
|
||||
Path mobDir =
|
||||
MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName());
|
||||
List<Path> mobFiles = MobUtils.getReferencedMobFiles(request.getFiles(), mobDir);
|
||||
// reset disableIO
|
||||
disableIO.set(Boolean.FALSE);
|
||||
if (mobFiles.size() > 0) {
|
||||
calculateMobLengthMap(mobFiles);
|
||||
try {
|
||||
final SetMultimap<TableName, String> mobRefs = request.getFiles().stream().map(file -> {
|
||||
byte[] value = file.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
||||
ImmutableSetMultimap.Builder<TableName, String> builder;
|
||||
if (value == null) {
|
||||
builder = ImmutableSetMultimap.builder();
|
||||
} else {
|
||||
try {
|
||||
builder = MobUtils.deserializeMobFileRefs(value);
|
||||
} catch (RuntimeException exception) {
|
||||
throw new RuntimeException("failure getting mob references for hfile " + file,
|
||||
exception);
|
||||
}
|
||||
}
|
||||
return builder;
|
||||
}).reduce((a, b) -> a.putAll(b.build())).orElseGet(ImmutableSetMultimap::builder).build();
|
||||
// reset disableIO
|
||||
disableIO.set(Boolean.FALSE);
|
||||
if (!mobRefs.isEmpty()) {
|
||||
calculateMobLengthMap(mobRefs);
|
||||
}
|
||||
LOG.info(
|
||||
"Table={} cf={} region={}. I/O optimized MOB compaction. "
|
||||
+ "Total referenced MOB files: {}",
|
||||
tableName, familyName, regionName, mobRefs.size());
|
||||
} catch (RuntimeException exception) {
|
||||
throw new IOException("Failed to get list of referenced hfiles for request " + request,
|
||||
exception);
|
||||
}
|
||||
LOG.info("Table={} cf={} region={}. I/O optimized MOB compaction. "
|
||||
+ "Total referenced MOB files: {}", tableName, familyName, regionName, mobFiles.size());
|
||||
}
|
||||
}
|
||||
|
||||
return compact(request, scannerFactory, writerFactory, throughputController, user);
|
||||
}
|
||||
|
||||
private void calculateMobLengthMap(List<Path> mobFiles) throws IOException {
|
||||
/**
|
||||
* @param mobRefs multimap of original table name -> mob hfile
|
||||
*/
|
||||
private void calculateMobLengthMap(SetMultimap<TableName, String> mobRefs) throws IOException {
|
||||
FileSystem fs = store.getFileSystem();
|
||||
HashMap<String, Long> map = mobLengthMap.get();
|
||||
map.clear();
|
||||
for (Path p : mobFiles) {
|
||||
if (MobFileName.isOldMobFileName(p.getName())) {
|
||||
for (Map.Entry<TableName, String> reference : mobRefs.entries()) {
|
||||
final TableName table = reference.getKey();
|
||||
final String mobfile = reference.getValue();
|
||||
if (MobFileName.isOldMobFileName(mobfile)) {
|
||||
disableIO.set(Boolean.TRUE);
|
||||
}
|
||||
FileStatus st = fs.getFileStatus(p);
|
||||
long size = st.getLen();
|
||||
LOG.debug("Referenced MOB file={} size={}", p, size);
|
||||
map.put(p.getName(), fs.getFileStatus(p).getLen());
|
||||
List<Path> locations = mobStore.getLocations(table);
|
||||
for (Path p : locations) {
|
||||
try {
|
||||
FileStatus st = fs.getFileStatus(new Path(p, mobfile));
|
||||
long size = st.getLen();
|
||||
LOG.debug("Referenced MOB file={} size={}", mobfile, size);
|
||||
map.put(mobfile, size);
|
||||
break;
|
||||
} catch (FileNotFoundException exception) {
|
||||
LOG.debug("Mob file {} was not in location {}. May have other locations to try.", mobfile,
|
||||
p);
|
||||
}
|
||||
}
|
||||
if (!map.containsKey(mobfile)) {
|
||||
throw new FileNotFoundException("Could not find mob file " + mobfile + " in the list of "
|
||||
+ "expected locations: " + locations);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -395,8 +434,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
// We leave large MOB file as is (is not compacted),
|
||||
// then we update set of MOB file references
|
||||
// and append mob cell directly to the store's writer
|
||||
mobRefSet.get().add(fName);
|
||||
writer.append(mobCell);
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), fName);
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException("MOB cell did not contain a tablename "
|
||||
+ "tag. should not be possible. see ref guide on mob troubleshooting. "
|
||||
+ "store=" + getStoreInfo() + " cell=" + c);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -444,9 +490,15 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
if (MobUtils.hasValidMobRefCellValue(c)) {
|
||||
// We do not check mobSizeThreshold during normal compaction,
|
||||
// leaving it to a MOB compaction run
|
||||
writer.append(c);
|
||||
// Add MOB reference to a MOB reference set
|
||||
mobRefSet.get().add(MobUtils.getMobFileName(c));
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException("MOB cell did not contain a tablename "
|
||||
+ "tag. should not be possible. see ref guide on mob troubleshooting. " + "store="
|
||||
+ getStoreInfo() + " cell=" + c);
|
||||
}
|
||||
} else {
|
||||
String errMsg = String.format("Corrupted MOB reference: %s", c.toString());
|
||||
throw new IOException(errMsg);
|
||||
|
@ -525,7 +577,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
throughputController.finish(compactionName);
|
||||
if (!finished && mobFileWriter != null) {
|
||||
// Remove all MOB references because compaction failed
|
||||
mobRefSet.get().clear();
|
||||
clearThreadLocals();
|
||||
// Abort writer
|
||||
LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
|
||||
mobFileWriter.getPath(), getStoreInfo());
|
||||
|
@ -543,16 +595,13 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
return true;
|
||||
}
|
||||
|
||||
private String getStoreInfo() {
|
||||
protected String getStoreInfo() {
|
||||
return String.format("[table=%s family=%s region=%s]", store.getTableName().getNameAsString(),
|
||||
store.getColumnFamilyName(), store.getRegionInfo().getEncodedName());
|
||||
}
|
||||
|
||||
private void clearThreadLocals() {
|
||||
Set<String> set = mobRefSet.get();
|
||||
if (set != null) {
|
||||
set.clear();
|
||||
}
|
||||
mobRefSet.get().clear();
|
||||
HashMap<String, Long> map = mobLengthMap.get();
|
||||
if (map != null) {
|
||||
map.clear();
|
||||
|
@ -567,7 +616,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
LOG.debug("New MOB writer created={} store={}", mobFileWriter.getPath().getName(),
|
||||
getStoreInfo());
|
||||
// Add reference we get for compact MOB
|
||||
mobRefSet.get().add(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
return mobFileWriter;
|
||||
} catch (IOException e) {
|
||||
// Bailing out
|
||||
|
@ -599,7 +648,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
LOG.debug("Aborting writer for {} because there are no MOB cells, store={}",
|
||||
mobFileWriter.getPath(), getStoreInfo());
|
||||
// Remove MOB file from reference set
|
||||
mobRefSet.get().remove(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().remove(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
abortWriter(mobFileWriter);
|
||||
}
|
||||
} else {
|
||||
|
@ -612,9 +661,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
|
|||
CompactionRequestImpl request) throws IOException {
|
||||
List<Path> newFiles = Lists.newArrayList(writer.getPath());
|
||||
writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), request.getFiles());
|
||||
// Append MOB references
|
||||
Set<String> refSet = mobRefSet.get();
|
||||
writer.appendMobMetadata(refSet);
|
||||
writer.appendMobMetadata(mobRefSet.get());
|
||||
writer.close();
|
||||
clearThreadLocals();
|
||||
return newFiles;
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
|
@ -47,6 +48,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||
|
||||
/**
|
||||
* An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. If the store is not a
|
||||
* mob store, the flusher flushes the MemStore the same with DefaultStoreFlusher, If the store is a
|
||||
|
@ -277,7 +280,8 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
// The hfile is current up to and including cacheFlushSeqNum.
|
||||
status.setStatus("Flushing " + store + ": appending metadata");
|
||||
writer.appendMetadata(cacheFlushSeqNum, false);
|
||||
writer.appendMobMetadata(mobRefSet.get());
|
||||
writer.appendMobMetadata(ImmutableSetMultimap.<TableName, String> builder()
|
||||
.putAll(store.getTableName(), mobRefSet.get()).build());
|
||||
status.setStatus("Flushing " + store + ": closing flushed file");
|
||||
writer.close();
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import com.google.errorprone.annotations.RestrictedApi;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -53,6 +52,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
/**
|
||||
* The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
|
||||
* (files which have no active references to) mob files.
|
||||
|
@ -209,27 +210,27 @@ public class MobFileCleanerChore extends ScheduledChore {
|
|||
byte[] bulkloadMarkerData = sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
|
||||
// close store file to avoid memory leaks
|
||||
sf.closeStoreFile(true);
|
||||
if (mobRefData == null && bulkloadMarkerData == null) {
|
||||
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
|
||||
+ "can not proceed until all old files will be MOB-compacted.", pp);
|
||||
return;
|
||||
} else if (mobRefData == null && bulkloadMarkerData != null) {
|
||||
LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
|
||||
continue;
|
||||
}
|
||||
// mobRefData will never be null here, but to make FindBugs happy
|
||||
if (mobRefData != null && mobRefData.length > 1) {
|
||||
// if length = 1 means NULL, that there are no MOB references
|
||||
// in this store file, but the file was created by new MOB code
|
||||
String[] mobs = new String(mobRefData).split(",");
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Found: {} mob references: {}", mobs.length, Arrays.toString(mobs));
|
||||
if (mobRefData == null) {
|
||||
if (bulkloadMarkerData == null) {
|
||||
LOG.warn("Found old store file with no MOB_FILE_REFS: {} - "
|
||||
+ "can not proceed until all old files will be MOB-compacted.", pp);
|
||||
return;
|
||||
} else {
|
||||
LOG.debug("Found: {} mob references", mobs.length);
|
||||
LOG.debug("Skipping file without MOB references (bulkloaded file):{}", pp);
|
||||
continue;
|
||||
}
|
||||
regionMobs.addAll(Arrays.asList(mobs));
|
||||
} else {
|
||||
LOG.debug("File {} does not have mob references", currentPath);
|
||||
}
|
||||
// file may or may not have MOB references, but was created by the distributed
|
||||
// mob compaction code.
|
||||
try {
|
||||
SetMultimap<TableName, String> mobs =
|
||||
MobUtils.deserializeMobFileRefs(mobRefData).build();
|
||||
LOG.debug("Found {} mob references for store={}", mobs.size(), sf);
|
||||
LOG.trace("Specific mob references found for store={} : {}", sf, mobs);
|
||||
regionMobs.addAll(mobs.values());
|
||||
} catch (RuntimeException exception) {
|
||||
throw new IOException("failure getting mob references for hfile " + sf,
|
||||
exception);
|
||||
}
|
||||
}
|
||||
} catch (FileNotFoundException e) {
|
||||
|
|
|
@ -22,17 +22,16 @@ import static org.apache.hadoop.hbase.mob.MobConstants.MOB_CLEANER_BATCH_SIZE_UP
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -70,6 +69,9 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
/**
|
||||
* The mob utilities
|
||||
*/
|
||||
|
@ -131,14 +133,51 @@ public final class MobUtils {
|
|||
* @param cell The current cell.
|
||||
* @return The table name tag.
|
||||
*/
|
||||
public static Tag getTableNameTag(Cell cell) {
|
||||
private static Optional<Tag> getTableNameTag(Cell cell) {
|
||||
Optional<Tag> tag = Optional.empty();
|
||||
if (cell.getTagsLength() > 0) {
|
||||
Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
|
||||
if (tag.isPresent()) {
|
||||
return tag.get();
|
||||
tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
|
||||
}
|
||||
return tag;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the table name from when this cell was written into a mob hfile as a string.
|
||||
* @param cell to extract tag from
|
||||
* @return table name as a string. empty if the tag is not found.
|
||||
*/
|
||||
public static Optional<String> getTableNameString(Cell cell) {
|
||||
Optional<Tag> tag = getTableNameTag(cell);
|
||||
Optional<String> name = Optional.empty();
|
||||
if (tag.isPresent()) {
|
||||
name = Optional.of(Tag.getValueAsString(tag.get()));
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the table name from when this cell was written into a mob hfile as a TableName.
|
||||
* @param cell to extract tag from
|
||||
* @return name of table as a TableName. empty if the tag is not found.
|
||||
*/
|
||||
public static Optional<TableName> getTableName(Cell cell) {
|
||||
Optional<Tag> maybe = getTableNameTag(cell);
|
||||
Optional<TableName> name = Optional.empty();
|
||||
if (maybe.isPresent()) {
|
||||
final Tag tag = maybe.get();
|
||||
if (tag.hasArray()) {
|
||||
name = Optional
|
||||
.of(TableName.valueOf(tag.getValueArray(), tag.getValueOffset(), tag.getValueLength()));
|
||||
} else {
|
||||
// TODO ByteBuffer handling in tags looks busted. revisit.
|
||||
ByteBuffer buffer = tag.getValueByteBuffer().duplicate();
|
||||
buffer.mark();
|
||||
buffer.position(tag.getValueOffset());
|
||||
buffer.limit(tag.getValueOffset() + tag.getValueLength());
|
||||
name = Optional.of(TableName.valueOf(buffer));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -395,8 +434,7 @@ public final class MobUtils {
|
|||
|
||||
/**
|
||||
* Gets the RegionInfo of the mob files. This is a dummy region. The mob files are not saved in a
|
||||
* region in HBase. This is only used in mob snapshot. It's internally used only. n * @return A
|
||||
* dummy mob region info.
|
||||
* region in HBase. It's internally used only.
|
||||
*/
|
||||
public static RegionInfo getMobRegionInfo(TableName tableName) {
|
||||
return RegionInfoBuilder.newBuilder(tableName).setStartKey(MobConstants.MOB_REGION_NAME_BYTES)
|
||||
|
@ -682,27 +720,81 @@ public final class MobUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get list of referenced MOB files from a given collection of store files
|
||||
* @param storeFiles store files
|
||||
* @param mobDir MOB file directory
|
||||
* @return list of MOB file paths
|
||||
* Serialize a set of referenced mob hfiles
|
||||
* @param mobRefSet to serialize, may be null
|
||||
* @return byte array to i.e. put into store file metadata. will not be null
|
||||
*/
|
||||
|
||||
public static List<Path> getReferencedMobFiles(Collection<HStoreFile> storeFiles, Path mobDir) {
|
||||
|
||||
Set<String> mobSet = new HashSet<String>();
|
||||
for (HStoreFile sf : storeFiles) {
|
||||
byte[] value = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
|
||||
if (value != null && value.length > 1) {
|
||||
String s = Bytes.toString(value);
|
||||
String[] all = s.split(",");
|
||||
Collections.addAll(mobSet, all);
|
||||
public static byte[] serializeMobFileRefs(SetMultimap<TableName, String> mobRefSet) {
|
||||
if (mobRefSet != null && mobRefSet.size() > 0) {
|
||||
// Here we rely on the fact that '/' and ',' are not allowed in either table names nor hfile
|
||||
// names for serialization.
|
||||
//
|
||||
// exampleTable/filename1,filename2//example:table/filename5//otherTable/filename3,filename4
|
||||
//
|
||||
// to approximate the needed capacity we use the fact that there will usually be 1 table name
|
||||
// and each mob filename is around 105 bytes. we pick an arbitrary number to cover "most"
|
||||
// single table name lengths
|
||||
StringBuilder sb = new StringBuilder(100 + mobRefSet.size() * 105);
|
||||
boolean doubleSlash = false;
|
||||
for (TableName tableName : mobRefSet.keySet()) {
|
||||
sb.append(tableName).append("/");
|
||||
boolean comma = false;
|
||||
for (String refs : mobRefSet.get(tableName)) {
|
||||
sb.append(refs);
|
||||
if (comma) {
|
||||
sb.append(",");
|
||||
} else {
|
||||
comma = true;
|
||||
}
|
||||
}
|
||||
if (doubleSlash) {
|
||||
sb.append("//");
|
||||
} else {
|
||||
doubleSlash = true;
|
||||
}
|
||||
}
|
||||
return Bytes.toBytes(sb.toString());
|
||||
} else {
|
||||
return HStoreFile.NULL_VALUE;
|
||||
}
|
||||
List<Path> retList = new ArrayList<Path>();
|
||||
for (String name : mobSet) {
|
||||
retList.add(new Path(mobDir, name));
|
||||
}
|
||||
return retList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserialize the set of referenced mob hfiles from store file metadata.
|
||||
* @param bytes compatibly serialized data. can not be null
|
||||
* @return a setmultimap of original table to list of hfile names. will be empty if no values.
|
||||
* @throws IllegalStateException if there are values but no table name
|
||||
*/
|
||||
public static ImmutableSetMultimap.Builder<TableName, String> deserializeMobFileRefs(byte[] bytes)
|
||||
throws IllegalStateException {
|
||||
ImmutableSetMultimap.Builder<TableName, String> map = ImmutableSetMultimap.builder();
|
||||
if (bytes.length > 1) {
|
||||
// TODO avoid turning the tablename pieces in to strings.
|
||||
String s = Bytes.toString(bytes);
|
||||
String[] tables = s.split("//");
|
||||
for (String tableEnc : tables) {
|
||||
final int delim = tableEnc.indexOf('/');
|
||||
if (delim <= 0) {
|
||||
throw new IllegalStateException("MOB reference data does not match expected encoding: "
|
||||
+ "no table name included before list of mob refs.");
|
||||
}
|
||||
TableName table = TableName.valueOf(tableEnc.substring(0, delim));
|
||||
String[] refs = tableEnc.substring(delim + 1).split(",");
|
||||
map.putAll(table, refs);
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// array length 1 should be the NULL_VALUE.
|
||||
if (!Arrays.equals(HStoreFile.NULL_VALUE, bytes)) {
|
||||
LOG.debug(
|
||||
"Serialized MOB file refs array was treated as the placeholder 'no entries' but"
|
||||
+ " didn't have the expected placeholder byte. expected={} and actual={}",
|
||||
Arrays.toString(HStoreFile.NULL_VALUE), Arrays.toString(bytes));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Date;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.mob.MobFileCache;
|
|||
import org.apache.hadoop.hbase.mob.MobFileName;
|
||||
import org.apache.hadoop.hbase.mob.MobStoreEngine;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
|
||||
import org.apache.hadoop.hbase.util.IdLock;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -90,7 +90,7 @@ public class HMobStore extends HStore {
|
|||
private AtomicLong mobFlushedCellsSize = new AtomicLong();
|
||||
private AtomicLong mobScanCellsCount = new AtomicLong();
|
||||
private AtomicLong mobScanCellsSize = new AtomicLong();
|
||||
private Map<String, List<Path>> map = new ConcurrentHashMap<>();
|
||||
private Map<TableName, List<Path>> map = new ConcurrentHashMap<>();
|
||||
private final IdLock keyLock = new IdLock();
|
||||
// When we add a MOB reference cell to the HFile, we will add 2 tags along with it
|
||||
// 1. A ref tag with type TagType.MOB_REFERENCE_TAG_TYPE. This just denote this this cell is not
|
||||
|
@ -112,7 +112,7 @@ public class HMobStore extends HStore {
|
|||
TableName tn = region.getTableDescriptor().getTableName();
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
|
||||
MobUtils.getMobRegionInfo(tn).getEncodedName(), family.getNameAsString()));
|
||||
map.put(Bytes.toString(tn.getName()), locations);
|
||||
map.put(tn, locations);
|
||||
List<Tag> tags = new ArrayList<>(2);
|
||||
tags.add(MobConstants.MOB_REF_TAG);
|
||||
Tag tableNameTag =
|
||||
|
@ -304,26 +304,9 @@ public class HMobStore extends HStore {
|
|||
MobCell mobCell = null;
|
||||
if (MobUtils.hasValidMobRefCellValue(reference)) {
|
||||
String fileName = MobUtils.getMobFileName(reference);
|
||||
Tag tableNameTag = MobUtils.getTableNameTag(reference);
|
||||
if (tableNameTag != null) {
|
||||
String tableNameString = Tag.getValueAsString(tableNameTag);
|
||||
List<Path> locations = map.get(tableNameString);
|
||||
if (locations == null) {
|
||||
IdLock.Entry lockEntry = keyLock.getLockEntry(tableNameString.hashCode());
|
||||
try {
|
||||
locations = map.get(tableNameString);
|
||||
if (locations == null) {
|
||||
locations = new ArrayList<>(2);
|
||||
TableName tn = TableName.valueOf(tableNameString);
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tn, getColumnFamilyName()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tn,
|
||||
MobUtils.getMobRegionInfo(tn).getEncodedName(), getColumnFamilyName()));
|
||||
map.put(tableNameString, locations);
|
||||
}
|
||||
} finally {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
Optional<TableName> tableName = MobUtils.getTableName(reference);
|
||||
if (tableName.isPresent()) {
|
||||
List<Path> locations = getLocations(tableName.get());
|
||||
mobCell = readCell(locations, fileName, reference, cacheBlocks, readPt,
|
||||
readEmptyValueOnMobCellMiss);
|
||||
}
|
||||
|
@ -346,6 +329,32 @@ public class HMobStore extends HStore {
|
|||
return mobCell;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tableName to look up locations for, can not be null
|
||||
* @return a list of location in order of working dir, archive dir. will not be null.
|
||||
*/
|
||||
public List<Path> getLocations(TableName tableName) throws IOException {
|
||||
List<Path> locations = map.get(tableName);
|
||||
if (locations == null) {
|
||||
IdLock.Entry lockEntry = keyLock.getLockEntry(tableName.hashCode());
|
||||
try {
|
||||
locations = map.get(tableName);
|
||||
if (locations == null) {
|
||||
locations = new ArrayList<>(2);
|
||||
locations.add(MobUtils.getMobFamilyPath(conf, tableName,
|
||||
getColumnFamilyDescriptor().getNameAsString()));
|
||||
locations.add(HFileArchiveUtil.getStoreArchivePath(conf, tableName,
|
||||
MobUtils.getMobRegionInfo(tableName).getEncodedName(),
|
||||
getColumnFamilyDescriptor().getNameAsString()));
|
||||
map.put(tableName, locations);
|
||||
}
|
||||
} finally {
|
||||
keyLock.releaseLockEntry(lockEntry);
|
||||
}
|
||||
}
|
||||
return locations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the cell from a mob file. The mob file might be located in different directories. 1. The
|
||||
* working directory. 2. The archive directory. Reads the cell from the files located in both of
|
||||
|
|
|
@ -323,7 +323,8 @@ public class HStoreFile implements StoreFile {
|
|||
}
|
||||
|
||||
/**
|
||||
* Only used by the Striped Compaction Policy n * @return value associated with the metadata key
|
||||
* @param key to look up
|
||||
* @return value associated with the metadata key
|
||||
*/
|
||||
public byte[] getMetadataValue(byte[] key) {
|
||||
return metadataMap.get(key);
|
||||
|
|
|
@ -26,7 +26,6 @@ import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_K
|
|||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_FILE_REFS;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.NULL_VALUE;
|
||||
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -40,7 +39,6 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -48,11 +46,13 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
|
||||
import org.apache.hadoop.hbase.mob.MobUtils;
|
||||
import org.apache.hadoop.hbase.util.BloomContext;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterFactory;
|
||||
import org.apache.hadoop.hbase.util.BloomFilterUtil;
|
||||
|
@ -68,6 +68,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
||||
|
@ -243,17 +244,11 @@ public class StoreFileWriter implements CellSink, ShipperListener {
|
|||
|
||||
/**
|
||||
* Appends MOB - specific metadata (even if it is empty)
|
||||
* @param mobRefSet - set of MOB file names
|
||||
* @param mobRefSet - original table -> set of MOB file names
|
||||
* @throws IOException problem writing to FS
|
||||
*/
|
||||
public void appendMobMetadata(Set<String> mobRefSet) throws IOException {
|
||||
if (mobRefSet != null && mobRefSet.size() > 0) {
|
||||
String sb = StringUtils.join(mobRefSet, ",");
|
||||
byte[] bytes = Bytes.toBytes(sb.toString());
|
||||
writer.appendFileInfo(MOB_FILE_REFS, bytes);
|
||||
} else {
|
||||
writer.appendFileInfo(MOB_FILE_REFS, NULL_VALUE);
|
||||
}
|
||||
public void appendMobMetadata(SetMultimap<TableName, String> mobRefSet) throws IOException {
|
||||
writer.appendFileInfo(MOB_FILE_REFS, MobUtils.serializeMobFileRefs(mobRefSet));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1906,6 +1906,29 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
|
|||
public static final byte[] START_KEY_BYTES = { FIRST_CHAR, FIRST_CHAR, FIRST_CHAR };
|
||||
public static final String START_KEY = new String(START_KEY_BYTES, HConstants.UTF8_CHARSET);
|
||||
|
||||
public TableDescriptorBuilder.ModifyableTableDescriptor
|
||||
createModifyableTableDescriptor(final String name) {
|
||||
return createModifyableTableDescriptor(TableName.valueOf(name),
|
||||
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, MAXVERSIONS, HConstants.FOREVER,
|
||||
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
|
||||
}
|
||||
|
||||
public TableDescriptorBuilder.ModifyableTableDescriptor createModifyableTableDescriptor(
|
||||
final TableName name, final int minVersions, final int versions, final int ttl,
|
||||
KeepDeletedCells keepDeleted) {
|
||||
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
|
||||
for (byte[] cfName : new byte[][] { fam1, fam2, fam3 }) {
|
||||
ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfName)
|
||||
.setMinVersions(minVersions).setMaxVersions(versions).setKeepDeletedCells(keepDeleted)
|
||||
.setBlockCacheEnabled(false).setTimeToLive(ttl);
|
||||
if (isNewVersionBehaviorEnabled()) {
|
||||
cfBuilder.setNewVersionBehavior(true);
|
||||
}
|
||||
builder.setColumnFamily(cfBuilder.build());
|
||||
}
|
||||
return new TableDescriptorBuilder.ModifyableTableDescriptor(name, builder.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated since 2.0.0 and will be removed in 3.0.0. Use
|
||||
* {@link #createTableDescriptor(TableName, int, int, int, KeepDeletedCells)} instead.
|
||||
|
|
|
@ -23,14 +23,15 @@ import java.io.InterruptedIOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.io.hfile.CorruptHFileException;
|
||||
import org.apache.hadoop.hbase.regionserver.CellSink;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
|
@ -116,8 +117,6 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
|||
}
|
||||
}
|
||||
|
||||
FileSystem fs = store.getFileSystem();
|
||||
|
||||
// Since scanner.next() can return 'false' but still be delivering data,
|
||||
// we have to use a do/while loop.
|
||||
List<Cell> cells = new ArrayList<>();
|
||||
|
@ -169,7 +168,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
|||
// Add the only reference we get for compact MOB case
|
||||
// because new store file will have only one MOB reference
|
||||
// in this case - of newly compacted MOB file
|
||||
mobRefSet.get().add(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
}
|
||||
do {
|
||||
hasMore = scanner.next(cells, scannerContext);
|
||||
|
@ -242,9 +241,15 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
|||
if (size > mobSizeThreshold) {
|
||||
// If the value size is larger than the threshold, it's regarded as a mob. Since
|
||||
// its value is already in the mob file, directly write this cell to the store file
|
||||
writer.append(c);
|
||||
// Add MOB reference to a set
|
||||
mobRefSet.get().add(MobUtils.getMobFileName(c));
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException("MOB cell did not contain a tablename "
|
||||
+ "tag. should not be possible. see ref guide on mob troubleshooting. "
|
||||
+ "store=" + getStoreInfo() + " cell=" + c);
|
||||
}
|
||||
} else {
|
||||
// If the value is not larger than the threshold, it's not regarded a mob. Retrieve
|
||||
// the mob cell from the mob file, and write it back to the store file.
|
||||
|
@ -260,9 +265,15 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
|||
// directly write the cell to the store file, and leave it to be handled by the
|
||||
// next compaction.
|
||||
LOG.error("Empty value for: " + c);
|
||||
writer.append(c);
|
||||
// Add MOB reference to a set
|
||||
mobRefSet.get().add(MobUtils.getMobFileName(c));
|
||||
Optional<TableName> refTable = MobUtils.getTableName(c);
|
||||
if (refTable.isPresent()) {
|
||||
mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
|
||||
writer.append(c);
|
||||
} else {
|
||||
throw new IOException("MOB cell did not contain a tablename "
|
||||
+ "tag. should not be possible. see ref guide on mob troubleshooting. "
|
||||
+ "store=" + getStoreInfo() + " cell=" + c);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -285,7 +296,7 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
|
|||
cellsCountCompactedToMob++;
|
||||
cellsSizeCompactedToMob += c.getValueLength();
|
||||
// Add ref we get for compact MOB case
|
||||
mobRefSet.get().add(mobFileWriter.getPath().getName());
|
||||
mobRefSet.get().put(store.getTableName(), mobFileWriter.getPath().getName());
|
||||
}
|
||||
|
||||
int len = c.getSerializedSize();
|
||||
|
|
|
@ -19,15 +19,10 @@ package org.apache.hadoop.hbase.mob;
|
|||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Mob file compaction chore in a generational non-batch mode test. 1. Uses default (non-batch) mode
|
||||
|
@ -39,37 +34,23 @@ import org.slf4j.LoggerFactory;
|
|||
* than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a
|
||||
* mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionOptMode extends TestMobCompactionBase {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionOptMode.class);
|
||||
public class TestMobCompactionOptMode extends TestMobCompactionWithDefaults {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
|
||||
|
||||
public TestMobCompactionOptMode() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initConf() {
|
||||
super.initConf();
|
||||
@BeforeClass
|
||||
public static void configureOptimizedCompaction() throws InterruptedException, IOException {
|
||||
HTU.shutdownMiniHBaseCluster();
|
||||
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction generational (non-batch) mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction generational (non-batch) mode finished OK");
|
||||
|
||||
HTU.startMiniHBaseCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, HTableDescriptor hdt, HColumnDescriptor hcd)
|
||||
throws IOException, InterruptedException {
|
||||
// Major compact MOB table
|
||||
admin.majorCompact(hdt.getTableName(), hcd.getName());
|
||||
protected String description() {
|
||||
return "generational (non-batch) mode";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.mob;
|
|||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -40,47 +39,43 @@ import org.slf4j.LoggerFactory;
|
|||
* time larger than minimum age to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB
|
||||
* files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionBase {
|
||||
public class TestMobCompactionOptRegionBatchMode extends TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionOptRegionBatchMode.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionOptRegionBatchMode.class);
|
||||
|
||||
private int batchSize = 7;
|
||||
private static final int batchSize = 7;
|
||||
private MobFileCompactionChore compactionChore;
|
||||
|
||||
public TestMobCompactionOptRegionBatchMode() {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
||||
}
|
||||
|
||||
protected void initConf() {
|
||||
super.initConf();
|
||||
@BeforeClass
|
||||
public static void configureOptimizedCompactionAndBatches()
|
||||
throws InterruptedException, IOException {
|
||||
HTU.shutdownMiniHBaseCluster();
|
||||
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
||||
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY, MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction chore generational batch mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction chore generational batch mode finished OK");
|
||||
|
||||
HTU.startMiniHBaseCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, HTableDescriptor hdt, HColumnDescriptor hcd)
|
||||
throws IOException, InterruptedException {
|
||||
// Major compact with batch mode enabled
|
||||
compactionChore.performMajorCompactionInBatches(admin, hdt, hcd);
|
||||
protected void mobCompactImpl(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
|
||||
compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, familyDescriptor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String description() {
|
||||
return "generational batch mode";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,13 +19,12 @@ package org.apache.hadoop.hbase.mob;
|
|||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -40,45 +39,40 @@ import org.slf4j.LoggerFactory;
|
|||
* to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is
|
||||
* 20. 12 Runs scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionBase {
|
||||
public class TestMobCompactionRegularRegionBatchMode extends TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestMobCompactionRegularRegionBatchMode.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionRegularRegionBatchMode.class);
|
||||
|
||||
private int batchSize = 7;
|
||||
private static final int batchSize = 7;
|
||||
private MobFileCompactionChore compactionChore;
|
||||
|
||||
public TestMobCompactionRegularRegionBatchMode() {
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
compactionChore = new MobFileCompactionChore(conf, batchSize);
|
||||
}
|
||||
|
||||
protected void initConf() {
|
||||
super.initConf();
|
||||
@BeforeClass
|
||||
public static void configureCompactionBatches() throws InterruptedException, IOException {
|
||||
HTU.shutdownMiniHBaseCluster();
|
||||
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE, batchSize);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionBatchMode() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction chore regular batch mode started");
|
||||
baseTestMobFileCompaction();
|
||||
LOG.info("MOB compaction chore regular batch mode finished OK");
|
||||
|
||||
HTU.startMiniHBaseCluster();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void mobCompact(Admin admin, HTableDescriptor hdt, HColumnDescriptor hcd)
|
||||
throws IOException, InterruptedException {
|
||||
// Major compact with batch mode enabled
|
||||
compactionChore.performMajorCompactionInBatches(admin, hdt, hcd);
|
||||
protected void mobCompactImpl(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
LOG.debug("compacting {} in batch mode.", tableDescriptor.getTableName());
|
||||
compactionChore.performMajorCompactionInBatches(admin, tableDescriptor, familyDescriptor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String description() {
|
||||
return "regular batch mode";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,322 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.mob;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.CompactionState;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Mob file compaction base test. 1. Enables batch mode for regular MOB compaction, Sets batch size
|
||||
* to 7 regions. (Optional) 2. Disables periodic MOB compactions, sets minimum age to archive to 10
|
||||
* sec 3. Creates MOB table with 20 regions 4. Loads MOB data (randomized keys, 1000 rows), flushes
|
||||
* data. 5. Repeats 4. two more times 6. Verifies that we have 20 *3 = 60 mob files (equals to
|
||||
* number of regions x 3) 7. Runs major MOB compaction. 8. Verifies that number of MOB files in a
|
||||
* mob directory is 20 x4 = 80 9. Waits for a period of time larger than minimum age to archive 10.
|
||||
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob directory is 20. 12 Runs
|
||||
* scanner and checks all 3 * 1000 rows.
|
||||
*/
|
||||
@Category(LargeTests.class)
|
||||
public class TestMobCompactionWithDefaults {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
|
||||
|
||||
protected static HBaseTestingUtility HTU;
|
||||
protected static Configuration conf;
|
||||
protected static long minAgeToArchive = 10000;
|
||||
|
||||
protected final static String famStr = "f1";
|
||||
protected final static byte[] fam = Bytes.toBytes(famStr);
|
||||
protected final static byte[] qualifier = Bytes.toBytes("q1");
|
||||
protected final static long mobLen = 10;
|
||||
protected final static byte[] mobVal = Bytes
|
||||
.toBytes("01234567890123456789012345678901234567890123456789012345678901234567890123456789");
|
||||
|
||||
@Rule
|
||||
public TestName test = new TestName();
|
||||
protected TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor;
|
||||
protected ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor familyDescriptor;
|
||||
protected Admin admin;
|
||||
protected TableName table = null;
|
||||
protected int numRegions = 20;
|
||||
protected int rows = 1000;
|
||||
|
||||
protected MobFileCleanerChore cleanerChore;
|
||||
|
||||
@BeforeClass
|
||||
public static void htuStart() throws Exception {
|
||||
HTU = new HBaseTestingUtility();
|
||||
conf = HTU.getConfiguration();
|
||||
conf.setInt("hfile.format.version", 3);
|
||||
// Disable automatic MOB compaction
|
||||
conf.setLong(MobConstants.MOB_COMPACTION_CHORE_PERIOD, 0);
|
||||
// Disable automatic MOB file cleaner chore
|
||||
conf.setLong(MobConstants.MOB_CLEANER_PERIOD, 0);
|
||||
// Set minimum age to archive to 10 sec
|
||||
conf.setLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY, minAgeToArchive);
|
||||
// Set compacted file discharger interval to a half minAgeToArchive
|
||||
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive / 2);
|
||||
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
|
||||
HTU.startMiniCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void htuStop() throws Exception {
|
||||
HTU.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName());
|
||||
admin = HTU.getAdmin();
|
||||
cleanerChore = new MobFileCleanerChore();
|
||||
familyDescriptor = new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam);
|
||||
familyDescriptor.setMobEnabled(true);
|
||||
familyDescriptor.setMobThreshold(mobLen);
|
||||
familyDescriptor.setMaxVersions(1);
|
||||
tableDescriptor.setColumnFamily(familyDescriptor);
|
||||
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
|
||||
byte[][] splitKeys = splitAlgo.split(numRegions);
|
||||
table = HTU.createTable(tableDescriptor, splitKeys).getName();
|
||||
}
|
||||
|
||||
private void loadData(TableName tableName, int num) {
|
||||
|
||||
Random r = new Random();
|
||||
LOG.info("Started loading {} rows into {}", num, tableName);
|
||||
try (final Table table = HTU.getConnection().getTable(tableName)) {
|
||||
for (int i = 0; i < num; i++) {
|
||||
byte[] key = new byte[32];
|
||||
r.nextBytes(key);
|
||||
Put p = new Put(key);
|
||||
p.addColumn(fam, qualifier, mobVal);
|
||||
table.put(p);
|
||||
}
|
||||
admin.flush(tableName);
|
||||
LOG.info("Finished loading {} rows into {}", num, tableName);
|
||||
} catch (Exception e) {
|
||||
LOG.error("MOB file compaction chore test FAILED", e);
|
||||
fail("MOB file compaction chore test FAILED");
|
||||
}
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
admin.disableTable(tableDescriptor.getTableName());
|
||||
admin.deleteTable(tableDescriptor.getTableName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void baseTestMobFileCompaction() throws InterruptedException, IOException {
|
||||
LOG.info("MOB compaction " + description() + " started");
|
||||
loadAndFlushThreeTimes(rows, table, famStr);
|
||||
mobCompact(tableDescriptor, familyDescriptor);
|
||||
assertEquals("Should have 4 MOB files per region due to 3xflush + compaction.", numRegions * 4,
|
||||
getNumberOfMobFiles(table, famStr));
|
||||
cleanupAndVerifyCounts(table, famStr, 3 * rows);
|
||||
LOG.info("MOB compaction " + description() + " finished OK");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionAfterSnapshotClone() throws InterruptedException, IOException {
|
||||
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
|
||||
LOG.info("MOB compaction of cloned snapshot, " + description() + " started");
|
||||
loadAndFlushThreeTimes(rows, table, famStr);
|
||||
LOG.debug("Taking snapshot and cloning table {}", table);
|
||||
admin.snapshot(test.getMethodName(), table);
|
||||
admin.cloneSnapshot(test.getMethodName(), clone);
|
||||
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
||||
getNumberOfMobFiles(clone, famStr));
|
||||
mobCompact(admin.getDescriptor(clone), familyDescriptor);
|
||||
assertEquals("Should have 3 hlinks + 1 MOB file per region due to clone + compact",
|
||||
4 * numRegions, getNumberOfMobFiles(clone, famStr));
|
||||
cleanupAndVerifyCounts(clone, famStr, 3 * rows);
|
||||
LOG.info("MOB compaction of cloned snapshot, " + description() + " finished OK");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMobFileCompactionAfterSnapshotCloneAndFlush()
|
||||
throws InterruptedException, IOException {
|
||||
final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
|
||||
LOG.info("MOB compaction of cloned snapshot after flush, " + description() + " started");
|
||||
loadAndFlushThreeTimes(rows, table, famStr);
|
||||
LOG.debug("Taking snapshot and cloning table {}", table);
|
||||
admin.snapshot(test.getMethodName(), table);
|
||||
admin.cloneSnapshot(test.getMethodName(), clone);
|
||||
assertEquals("Should have 3 hlinks per region in MOB area from snapshot clone", 3 * numRegions,
|
||||
getNumberOfMobFiles(clone, famStr));
|
||||
loadAndFlushThreeTimes(rows, clone, famStr);
|
||||
mobCompact(admin.getDescriptor(clone), familyDescriptor);
|
||||
assertEquals("Should have 7 MOB file per region due to clone + 3xflush + compact",
|
||||
7 * numRegions, getNumberOfMobFiles(clone, famStr));
|
||||
cleanupAndVerifyCounts(clone, famStr, 6 * rows);
|
||||
LOG.info("MOB compaction of cloned snapshot w flush, " + description() + " finished OK");
|
||||
}
|
||||
|
||||
protected void loadAndFlushThreeTimes(int rows, TableName table, String family)
|
||||
throws IOException {
|
||||
final long start = getNumberOfMobFiles(table, family);
|
||||
// Load and flush data 3 times
|
||||
loadData(table, rows);
|
||||
loadData(table, rows);
|
||||
loadData(table, rows);
|
||||
assertEquals("Should have 3 more mob files per region from flushing.", start + numRegions * 3,
|
||||
getNumberOfMobFiles(table, family));
|
||||
}
|
||||
|
||||
protected String description() {
|
||||
return "regular mode";
|
||||
}
|
||||
|
||||
protected void enableCompactions() throws IOException {
|
||||
final List<String> serverList =
|
||||
admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
|
||||
admin.compactionSwitch(true, serverList);
|
||||
}
|
||||
|
||||
protected void disableCompactions() throws IOException {
|
||||
final List<String> serverList =
|
||||
admin.getRegionServers().stream().map(sn -> sn.getServerName()).collect(Collectors.toList());
|
||||
admin.compactionSwitch(false, serverList);
|
||||
}
|
||||
|
||||
/**
|
||||
* compact the given table and return once it is done. should presume compactions are disabled
|
||||
* when called. should ensure compactions are disabled before returning.
|
||||
*/
|
||||
protected void mobCompact(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
LOG.debug("Major compact MOB table " + tableDescriptor.getTableName());
|
||||
enableCompactions();
|
||||
mobCompactImpl(tableDescriptor, familyDescriptor);
|
||||
waitUntilCompactionIsComplete(tableDescriptor.getTableName());
|
||||
disableCompactions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Call the API for compaction specific to the test set. should not wait for compactions to
|
||||
* finish. may assume compactions are enabled when called.
|
||||
*/
|
||||
protected void mobCompactImpl(TableDescriptor tableDescriptor,
|
||||
ColumnFamilyDescriptor familyDescriptor) throws IOException, InterruptedException {
|
||||
admin.majorCompact(tableDescriptor.getTableName(), familyDescriptor.getName());
|
||||
}
|
||||
|
||||
protected void waitUntilCompactionIsComplete(TableName table)
|
||||
throws IOException, InterruptedException {
|
||||
CompactionState state = admin.getCompactionState(table);
|
||||
while (state != CompactionState.NONE) {
|
||||
LOG.debug("Waiting for compaction on {} to complete. current state {}", table, state);
|
||||
Thread.sleep(100);
|
||||
state = admin.getCompactionState(table);
|
||||
}
|
||||
LOG.debug("done waiting for compaction on {}", table);
|
||||
}
|
||||
|
||||
protected void cleanupAndVerifyCounts(TableName table, String family, int rows)
|
||||
throws InterruptedException, IOException {
|
||||
// We have guarantee, that compacted file discharger will run during this pause
|
||||
// because it has interval less than this wait time
|
||||
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
|
||||
|
||||
Thread.sleep(minAgeToArchive + 1000);
|
||||
LOG.info("Cleaning up MOB files");
|
||||
// Cleanup again
|
||||
cleanerChore.cleanupObsoleteMobFiles(conf, table);
|
||||
|
||||
assertEquals("After cleaning, we should have 1 MOB file per region based on size.", numRegions,
|
||||
getNumberOfMobFiles(table, family));
|
||||
|
||||
LOG.debug("checking count of rows");
|
||||
long scanned = scanTable(table);
|
||||
assertEquals("Got the wrong number of rows in table " + table + " cf " + family, rows, scanned);
|
||||
|
||||
}
|
||||
|
||||
protected long getNumberOfMobFiles(TableName tableName, String family) throws IOException {
|
||||
FileSystem fs = FileSystem.get(conf);
|
||||
Path dir = MobUtils.getMobFamilyPath(conf, tableName, family);
|
||||
FileStatus[] stat = fs.listStatus(dir);
|
||||
for (FileStatus st : stat) {
|
||||
LOG.debug("MOB Directory content: {}", st.getPath());
|
||||
}
|
||||
LOG.debug("MOB Directory content total files: {}", stat.length);
|
||||
|
||||
return stat.length;
|
||||
}
|
||||
|
||||
protected long scanTable(TableName tableName) {
|
||||
try (final Table table = HTU.getConnection().getTable(tableName);
|
||||
final ResultScanner scanner = table.getScanner(fam)) {
|
||||
Result result;
|
||||
long counter = 0;
|
||||
while ((result = scanner.next()) != null) {
|
||||
assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal));
|
||||
counter++;
|
||||
}
|
||||
return counter;
|
||||
} catch (Exception e) {
|
||||
LOG.error("MOB file compaction test FAILED", e);
|
||||
if (HTU != null) {
|
||||
fail(e.getMessage());
|
||||
} else {
|
||||
System.exit(-1);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue