HBASE-13393 - Optimize memstore flushing to avoid writing tag information
to hfiles when no tags are present. (Ram)
This commit is contained in:
parent
ef18d75d00
commit
eeb11b5327
|
@ -570,11 +570,12 @@ public class KeyValueUtil {
|
|||
* <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
|
||||
*
|
||||
* @param in
|
||||
* @param withTags whether the keyvalue should include tags are not
|
||||
* @return Created KeyValue OR if we find a length of zero, we will return
|
||||
* null which can be useful marking a stream as done.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static KeyValue iscreate(final InputStream in) throws IOException {
|
||||
public static KeyValue iscreate(final InputStream in, boolean withTags) throws IOException {
|
||||
byte[] intBytes = new byte[Bytes.SIZEOF_INT];
|
||||
int bytesRead = 0;
|
||||
while (bytesRead < intBytes.length) {
|
||||
|
@ -589,7 +590,11 @@ public class KeyValueUtil {
|
|||
// TODO: perhaps some sanity check is needed here.
|
||||
byte[] bytes = new byte[Bytes.toInt(intBytes)];
|
||||
IOUtils.readFully(in, bytes, 0, bytes.length);
|
||||
return new KeyValue(bytes, 0, bytes.length);
|
||||
if (withTags) {
|
||||
return new KeyValue(bytes, 0, bytes.length);
|
||||
} else {
|
||||
return new NoTagsKeyValue(bytes, 0, bytes.length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -64,7 +64,8 @@ public class KeyValueCodec implements Codec {
|
|||
}
|
||||
|
||||
protected Cell parseCell() throws IOException {
|
||||
return KeyValueUtil.iscreate(in);
|
||||
// No tags here
|
||||
return KeyValueUtil.iscreate(in, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,8 @@ public class KeyValueCodecWithTags implements Codec {
|
|||
}
|
||||
|
||||
protected Cell parseCell() throws IOException {
|
||||
return KeyValueUtil.iscreate(in);
|
||||
// create KeyValue with tags
|
||||
return KeyValueUtil.iscreate(in, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -569,9 +569,9 @@ public class TestKeyValue extends TestCase {
|
|||
KeyValueUtil.oswrite(mkvA2, os, true);
|
||||
DataInputStream is = new DataInputStream(new ByteArrayInputStream(
|
||||
byteArrayOutputStream.toByteArray()));
|
||||
KeyValue deSerKV1 = KeyValueUtil.iscreate(is);
|
||||
KeyValue deSerKV1 = KeyValueUtil.iscreate(is, true);
|
||||
assertTrue(kvA1.equals(deSerKV1));
|
||||
KeyValue deSerKV2 = KeyValueUtil.iscreate(is);
|
||||
KeyValue deSerKV2 = KeyValueUtil.iscreate(is, true);
|
||||
assertTrue(kvA2.equals(deSerKV2));
|
||||
}
|
||||
|
||||
|
|
|
@ -100,6 +100,7 @@ public class DefaultMemStore implements MemStore {
|
|||
volatile MemStoreLAB allocator;
|
||||
volatile MemStoreLAB snapshotAllocator;
|
||||
volatile long snapshotId;
|
||||
volatile boolean tagsPresent;
|
||||
|
||||
/**
|
||||
* Default constructor. Used for tests.
|
||||
|
@ -171,8 +172,11 @@ public class DefaultMemStore implements MemStore {
|
|||
timeOfOldestEdit = Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
return new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
|
||||
this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator));
|
||||
MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize,
|
||||
this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator),
|
||||
this.tagsPresent);
|
||||
this.tagsPresent = false;
|
||||
return memStoreSnapshot;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -234,6 +238,13 @@ public class DefaultMemStore implements MemStore {
|
|||
|
||||
private boolean addToCellSet(Cell e) {
|
||||
boolean b = this.cellSet.add(e);
|
||||
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
|
||||
// When we use ACL CP or Visibility CP which deals with Tags during
|
||||
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
|
||||
// parse the byte[] to identify the tags length.
|
||||
if(e.getTagsLength() > 0) {
|
||||
tagsPresent = true;
|
||||
}
|
||||
setOldestEditTimeToNow();
|
||||
return b;
|
||||
}
|
||||
|
@ -1006,8 +1017,8 @@ public class DefaultMemStore implements MemStore {
|
|||
}
|
||||
}
|
||||
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(
|
||||
ClassSize.OBJECT + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
|
||||
public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
|
||||
+ (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN);
|
||||
|
||||
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||
ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
|
||||
|
|
|
@ -63,8 +63,8 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
|||
synchronized (flushLock) {
|
||||
status.setStatus("Flushing " + store + ": creating writer");
|
||||
// Write the map out to the disk
|
||||
writer = store.createWriterInTmp(
|
||||
cellsCount, store.getFamily().getCompressionType(), false, true, true);
|
||||
writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(), false,
|
||||
true, snapshot.isTagsPresent());
|
||||
writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
|
||||
IOException e = null;
|
||||
try {
|
||||
|
|
|
@ -32,14 +32,16 @@ public class MemStoreSnapshot {
|
|||
private final long size;
|
||||
private final TimeRangeTracker timeRangeTracker;
|
||||
private final KeyValueScanner scanner;
|
||||
private final boolean tagsPresent;
|
||||
|
||||
public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker,
|
||||
KeyValueScanner scanner) {
|
||||
KeyValueScanner scanner, boolean tagsPresent) {
|
||||
this.id = id;
|
||||
this.cellsCount = cellsCount;
|
||||
this.size = size;
|
||||
this.timeRangeTracker = timeRangeTracker;
|
||||
this.scanner = scanner;
|
||||
this.tagsPresent = tagsPresent;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -76,4 +78,11 @@ public class MemStoreSnapshot {
|
|||
public KeyValueScanner getScanner() {
|
||||
return this.scanner;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if tags are present in this snapshot
|
||||
*/
|
||||
public boolean isTagsPresent() {
|
||||
return this.tagsPresent;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,6 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
|
@ -5821,6 +5820,25 @@ public class TestHRegion {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushedFileWithNoTags() throws Exception {
|
||||
TableName tableName = TableName.valueOf(getClass().getSimpleName());
|
||||
HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor(fam1));
|
||||
HRegionInfo info = new HRegionInfo(tableName, null, null, false);
|
||||
Path path = TEST_UTIL.getDataTestDir(getClass().getSimpleName());
|
||||
region = HBaseTestingUtility.createRegionAndWAL(info, path, TEST_UTIL.getConfiguration(), htd);
|
||||
Put put = new Put(Bytes.toBytes("a-b-0-0"));
|
||||
put.addColumn(fam1, qual1, Bytes.toBytes("c1-value"));
|
||||
region.put(put);
|
||||
region.flush(true);
|
||||
Store store = region.getStore(fam1);
|
||||
Collection<StoreFile> storefiles = store.getStorefiles();
|
||||
for (StoreFile sf : storefiles) {
|
||||
assertFalse("Tags should not be present "
|
||||
,sf.getReader().getHFileReader().getFileContext().isIncludesTags());
|
||||
}
|
||||
}
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testOpenRegionWrittenToWALForLogReplay() throws Exception {
|
||||
|
|
|
@ -30,6 +30,7 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.CellScanner;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
|
@ -57,8 +59,11 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResul
|
|||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
|
@ -806,6 +811,39 @@ public abstract class TestVisibilityLabels {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushedFileWithVisibilityTags() throws Exception {
|
||||
final byte[] qual2 = Bytes.toBytes("qual2");
|
||||
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
|
||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||
HColumnDescriptor col = new HColumnDescriptor(fam);
|
||||
desc.addFamily(col);
|
||||
TEST_UTIL.getHBaseAdmin().createTable(desc);
|
||||
try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
|
||||
Put p1 = new Put(row1);
|
||||
p1.add(fam, qual, value);
|
||||
p1.setCellVisibility(new CellVisibility(CONFIDENTIAL));
|
||||
|
||||
Put p2 = new Put(row1);
|
||||
p2.add(fam, qual2, value);
|
||||
p2.setCellVisibility(new CellVisibility(SECRET));
|
||||
|
||||
RowMutations rm = new RowMutations(row1);
|
||||
rm.add(p1);
|
||||
rm.add(p2);
|
||||
|
||||
table.mutateRow(rm);
|
||||
}
|
||||
TEST_UTIL.getHBaseAdmin().flush(tableName);
|
||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
|
||||
Store store = regions.get(0).getStore(fam);
|
||||
Collection<StoreFile> storefiles = store.getStorefiles();
|
||||
assertTrue(storefiles.size() > 0);
|
||||
for (StoreFile storeFile : storefiles) {
|
||||
assertTrue(storeFile.getReader().getHFileReader().getFileContext().isIncludesTags());
|
||||
}
|
||||
}
|
||||
|
||||
static Table createTableAndWriteDataWithLabels(TableName tableName, String... labelExps)
|
||||
throws Exception {
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
|
|
Loading…
Reference in New Issue