HBASE-10560 Per cell TTLs
This commit is contained in:
parent
f83e25e180
commit
09cd3d7bfb
|
@ -171,4 +171,9 @@ public class Append extends Mutation {
|
||||||
public Append setACL(Map<String, Permission> perms) {
|
public Append setACL(Map<String, Permission> perms) {
|
||||||
return (Append) super.setACL(perms);
|
return (Append) super.setACL(perms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Append setTTL(long ttl) {
|
||||||
|
return (Append) super.setTTL(ttl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -461,4 +461,9 @@ public class Delete extends Mutation implements Comparable<Row> {
|
||||||
public Delete setACL(Map<String, Permission> perms) {
|
public Delete setACL(Map<String, Permission> perms) {
|
||||||
return (Delete) super.setACL(perms);
|
return (Delete) super.setACL(perms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Delete setTTL(long ttl) {
|
||||||
|
throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -317,4 +317,9 @@ public class Increment extends Mutation implements Comparable<Row> {
|
||||||
public Increment setACL(Map<String, Permission> perms) {
|
public Increment setACL(Map<String, Permission> perms) {
|
||||||
return (Increment) super.setACL(perms);
|
return (Increment) super.setACL(perms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Increment setTTL(long ttl) {
|
||||||
|
return (Increment) super.setTTL(ttl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
|
||||||
*/
|
*/
|
||||||
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
|
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The attribute for storing TTL for the result of the mutation.
|
||||||
|
*/
|
||||||
|
private static final String OP_ATTRIBUTE_TTL = "_ttl";
|
||||||
|
|
||||||
protected byte [] row = null;
|
protected byte [] row = null;
|
||||||
protected long ts = HConstants.LATEST_TIMESTAMP;
|
protected long ts = HConstants.LATEST_TIMESTAMP;
|
||||||
protected Durability durability = Durability.USE_DEFAULT;
|
protected Durability durability = Durability.USE_DEFAULT;
|
||||||
|
@ -199,6 +204,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
|
||||||
if (getId() != null) {
|
if (getId() != null) {
|
||||||
map.put("id", getId());
|
map.put("id", getId());
|
||||||
}
|
}
|
||||||
|
// Add the TTL if set
|
||||||
|
// Long.MAX_VALUE is the default, and is interpreted to mean this attribute
|
||||||
|
// has not been set.
|
||||||
|
if (getTTL() != Long.MAX_VALUE) {
|
||||||
|
map.put("ttl", getTTL());
|
||||||
|
}
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,6 +427,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the TTL requested for the result of the mutation, in milliseconds.
|
||||||
|
* @return the TTL requested for the result of the mutation, in milliseconds,
|
||||||
|
* or Long.MAX_VALUE if unset
|
||||||
|
*/
|
||||||
|
public long getTTL() {
|
||||||
|
byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
|
||||||
|
if (ttlBytes != null) {
|
||||||
|
return Bytes.toLong(ttlBytes);
|
||||||
|
}
|
||||||
|
return Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the TTL desired for the result of the mutation, in milliseconds.
|
||||||
|
* @param ttl the TTL desired for the result of the mutation, in milliseconds
|
||||||
|
* @return this
|
||||||
|
*/
|
||||||
|
public Mutation setTTL(long ttl) {
|
||||||
|
setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Subclasses should override this method to add the heap size of their own fields.
|
* Subclasses should override this method to add the heap size of their own fields.
|
||||||
* @return the heap size to add (will be aligned).
|
* @return the heap size to add (will be aligned).
|
||||||
|
|
|
@ -460,4 +460,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
|
||||||
public Put setACL(Map<String, Permission> perms) {
|
public Put setACL(Map<String, Permission> perms) {
|
||||||
return (Put) super.setACL(perms);
|
return (Put) super.setACL(perms);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Put setTTL(long ttl) {
|
||||||
|
return (Put) super.setTTL(ttl);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,4 +30,5 @@ public final class TagType {
|
||||||
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
|
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
|
||||||
// String based tag type used in replication
|
// String based tag type used in replication
|
||||||
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
|
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
|
||||||
|
public static final byte TTL_TAG_TYPE = (byte)8;
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class CellCreator {
|
||||||
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
|
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
|
||||||
int vlength) throws IOException {
|
int vlength) throws IOException {
|
||||||
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
|
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
|
||||||
timestamp, value, voffset, vlength, null);
|
timestamp, value, voffset, vlength, (List<Tag>)null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -90,6 +90,7 @@ public class CellCreator {
|
||||||
* @return created Cell
|
* @return created Cell
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
|
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
|
||||||
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
|
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
|
||||||
int vlength, String visExpression) throws IOException {
|
int vlength, String visExpression) throws IOException {
|
||||||
|
@ -100,4 +101,36 @@ public class CellCreator {
|
||||||
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
|
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
|
||||||
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
|
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param row row key
|
||||||
|
* @param roffset row offset
|
||||||
|
* @param rlength row length
|
||||||
|
* @param family family name
|
||||||
|
* @param foffset family offset
|
||||||
|
* @param flength family length
|
||||||
|
* @param qualifier column qualifier
|
||||||
|
* @param qoffset qualifier offset
|
||||||
|
* @param qlength qualifier length
|
||||||
|
* @param timestamp version timestamp
|
||||||
|
* @param value column value
|
||||||
|
* @param voffset value offset
|
||||||
|
* @param vlength value length
|
||||||
|
* @param tags
|
||||||
|
* @return created Cell
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
|
||||||
|
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
|
||||||
|
int vlength, List<Tag> tags) throws IOException {
|
||||||
|
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
|
||||||
|
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Visibility expression resolver
|
||||||
|
*/
|
||||||
|
public VisibilityExpressionResolver getVisibilityExpressionResolver() {
|
||||||
|
return this.visExpResolver;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,13 +125,20 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
|
|
||||||
public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
|
public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
|
||||||
|
|
||||||
|
public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
|
||||||
|
|
||||||
private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
|
private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
|
||||||
|
|
||||||
public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
|
public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
|
||||||
|
|
||||||
public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
|
public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
|
||||||
|
|
||||||
|
public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
|
||||||
|
|
||||||
private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
|
private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
|
||||||
|
|
||||||
|
private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param columnsSpecification the list of columns to parser out, comma separated.
|
* @param columnsSpecification the list of columns to parser out, comma separated.
|
||||||
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
|
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
|
||||||
|
@ -162,14 +169,18 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
timestampKeyColumnIndex = i;
|
timestampKeyColumnIndex = i;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
|
if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
|
||||||
attrKeyColumnIndex = i;
|
attrKeyColumnIndex = i;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
|
if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
|
||||||
cellVisibilityColumnIndex = i;
|
cellVisibilityColumnIndex = i;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (CELL_TTL_COLUMN_SPEC.equals(str)) {
|
||||||
|
cellTTLColumnIndex = i;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
String[] parts = str.split(":", 2);
|
String[] parts = str.split(":", 2);
|
||||||
if (parts.length == 1) {
|
if (parts.length == 1) {
|
||||||
families[i] = str.getBytes();
|
families[i] = str.getBytes();
|
||||||
|
@ -197,6 +208,10 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
|
return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean hasCellTTL() {
|
||||||
|
return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
|
||||||
|
}
|
||||||
|
|
||||||
public int getAttributesKeyColumnIndex() {
|
public int getAttributesKeyColumnIndex() {
|
||||||
return attrKeyColumnIndex;
|
return attrKeyColumnIndex;
|
||||||
}
|
}
|
||||||
|
@ -204,9 +219,15 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
public int getCellVisibilityColumnIndex() {
|
public int getCellVisibilityColumnIndex() {
|
||||||
return cellVisibilityColumnIndex;
|
return cellVisibilityColumnIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getCellTTLColumnIndex() {
|
||||||
|
return cellTTLColumnIndex;
|
||||||
|
}
|
||||||
|
|
||||||
public int getRowKeyColumnIndex() {
|
public int getRowKeyColumnIndex() {
|
||||||
return rowKeyColumnIndex;
|
return rowKeyColumnIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
public byte[] getFamily(int idx) {
|
public byte[] getFamily(int idx) {
|
||||||
return families[idx];
|
return families[idx];
|
||||||
}
|
}
|
||||||
|
@ -238,8 +259,10 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
throw new BadTsvLineException("No timestamp");
|
throw new BadTsvLineException("No timestamp");
|
||||||
} else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
|
} else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
|
||||||
throw new BadTsvLineException("No attributes specified");
|
throw new BadTsvLineException("No attributes specified");
|
||||||
} else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
|
} else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
|
||||||
throw new BadTsvLineException("No cell visibility specified");
|
throw new BadTsvLineException("No cell visibility specified");
|
||||||
|
} else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
|
||||||
|
throw new BadTsvLineException("No cell TTL specified");
|
||||||
}
|
}
|
||||||
return new ParsedLine(tabOffsets, lineBytes);
|
return new ParsedLine(tabOffsets, lineBytes);
|
||||||
}
|
}
|
||||||
|
@ -336,6 +359,31 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getCellTTLColumnOffset() {
|
||||||
|
if (hasCellTTL()) {
|
||||||
|
return getColumnOffset(cellTTLColumnIndex);
|
||||||
|
} else {
|
||||||
|
return DEFAULT_CELL_TTL_COLUMN_INDEX;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCellTTLColumnLength() {
|
||||||
|
if (hasCellTTL()) {
|
||||||
|
return getColumnLength(cellTTLColumnIndex);
|
||||||
|
} else {
|
||||||
|
return DEFAULT_CELL_TTL_COLUMN_INDEX;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCellTTL() {
|
||||||
|
if (!hasCellTTL()) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
|
||||||
|
getColumnLength(cellTTLColumnIndex));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public int getColumnOffset(int idx) {
|
public int getColumnOffset(int idx) {
|
||||||
if (idx > 0)
|
if (idx > 0)
|
||||||
return tabOffsets.get(idx - 1) + 1;
|
return tabOffsets.get(idx - 1) + 1;
|
||||||
|
@ -535,6 +583,7 @@ public class ImportTsv extends Configured implements Tool {
|
||||||
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|
||||||
|| TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
|
|| TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
|
||||||
|| TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
|
|| TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
|
||||||
|
|| TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
|
||||||
|| TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
|
|| TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
|
||||||
continue;
|
continue;
|
||||||
// we are only concerned with the first one (in case this is a cf:cq)
|
// we are only concerned with the first one (in case this is a cf:cq)
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.util.Base64;
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
import org.apache.hadoop.mapreduce.Reducer;
|
import org.apache.hadoop.mapreduce.Reducer;
|
||||||
|
@ -62,6 +67,9 @@ public class TextSortReducer extends
|
||||||
/** Cell visibility expr **/
|
/** Cell visibility expr **/
|
||||||
private String cellVisibilityExpr;
|
private String cellVisibilityExpr;
|
||||||
|
|
||||||
|
/** Cell TTL */
|
||||||
|
private long ttl;
|
||||||
|
|
||||||
private CellCreator kvCreator;
|
private CellCreator kvCreator;
|
||||||
|
|
||||||
public long getTs() {
|
public long getTs() {
|
||||||
|
@ -148,18 +156,30 @@ public class TextSortReducer extends
|
||||||
// Retrieve timestamp if exists
|
// Retrieve timestamp if exists
|
||||||
ts = parsed.getTimestamp(ts);
|
ts = parsed.getTimestamp(ts);
|
||||||
cellVisibilityExpr = parsed.getCellVisibility();
|
cellVisibilityExpr = parsed.getCellVisibility();
|
||||||
|
ttl = parsed.getCellTTL();
|
||||||
|
|
||||||
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||||
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|
||||||
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
|
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|
||||||
|
|| i == parser.getCellTTLColumnIndex()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// Creating the KV which needs to be directly written to HFiles. Using the Facade
|
// Creating the KV which needs to be directly written to HFiles. Using the Facade
|
||||||
// KVCreator for creation of kvs.
|
// KVCreator for creation of kvs.
|
||||||
|
List<Tag> tags = new ArrayList<Tag>();
|
||||||
|
if (cellVisibilityExpr != null) {
|
||||||
|
tags.addAll(kvCreator.getVisibilityExpressionResolver()
|
||||||
|
.createVisibilityExpTags(cellVisibilityExpr));
|
||||||
|
}
|
||||||
|
// Add TTL directly to the KV so we can vary them when packing more than one KV
|
||||||
|
// into puts
|
||||||
|
if (ttl > 0) {
|
||||||
|
tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
|
||||||
|
}
|
||||||
Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
|
Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
|
||||||
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
|
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
|
||||||
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
|
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
|
||||||
parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
|
parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
|
||||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||||
kvs.add(kv);
|
kvs.add(kv);
|
||||||
curSize += kv.heapSize();
|
curSize += kv.heapSize();
|
||||||
|
|
|
@ -18,17 +18,22 @@
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
|
||||||
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
|
||||||
import org.apache.hadoop.hbase.util.Base64;
|
import org.apache.hadoop.hbase.util.Base64;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Counter;
|
import org.apache.hadoop.mapreduce.Counter;
|
||||||
|
@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
||||||
|
|
||||||
protected String cellVisibilityExpr;
|
protected String cellVisibilityExpr;
|
||||||
|
|
||||||
|
protected long ttl;
|
||||||
|
|
||||||
protected CellCreator kvCreator;
|
protected CellCreator kvCreator;
|
||||||
|
|
||||||
private String hfileOutPath;
|
private String hfileOutPath;
|
||||||
|
@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
||||||
// Retrieve timestamp if exists
|
// Retrieve timestamp if exists
|
||||||
ts = parsed.getTimestamp(ts);
|
ts = parsed.getTimestamp(ts);
|
||||||
cellVisibilityExpr = parsed.getCellVisibility();
|
cellVisibilityExpr = parsed.getCellVisibility();
|
||||||
|
ttl = parsed.getCellTTL();
|
||||||
|
|
||||||
Put put = new Put(rowKey.copyBytes());
|
Put put = new Put(rowKey.copyBytes());
|
||||||
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||||
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|
||||||
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
|
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|
||||||
|
|| i == parser.getCellTTLColumnIndex()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
populatePut(lineBytes, parsed, put, i);
|
populatePut(lineBytes, parsed, put, i);
|
||||||
|
@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
||||||
// the validation
|
// the validation
|
||||||
put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
|
put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
|
||||||
}
|
}
|
||||||
|
if (ttl > 0) {
|
||||||
|
put.setTTL(ttl);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Creating the KV which needs to be directly written to HFiles. Using the Facade
|
// Creating the KV which needs to be directly written to HFiles. Using the Facade
|
||||||
// KVCreator for creation of kvs.
|
// KVCreator for creation of kvs.
|
||||||
|
List<Tag> tags = new ArrayList<Tag>();
|
||||||
|
if (cellVisibilityExpr != null) {
|
||||||
|
tags.addAll(kvCreator.getVisibilityExpressionResolver()
|
||||||
|
.createVisibilityExpTags(cellVisibilityExpr));
|
||||||
|
}
|
||||||
|
// Add TTL directly to the KV so we can vary them when packing more than one KV
|
||||||
|
// into puts
|
||||||
|
if (ttl > 0) {
|
||||||
|
tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
|
||||||
|
}
|
||||||
cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
|
cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
|
||||||
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
|
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
|
||||||
parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
|
parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
|
||||||
parsed.getColumnLength(i), cellVisibilityExpr);
|
parsed.getColumnLength(i), tags);
|
||||||
}
|
}
|
||||||
put.add(cell);
|
put.add(cell);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
|
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
|
||||||
* Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
|
* Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
|
||||||
* implement the {@link DeleteTracker} interface since state spans rows (There
|
* implement the {@link DeleteTracker} interface since state spans rows (There
|
||||||
* is no update nor reset method).
|
* is no update nor reset method).
|
||||||
*/
|
*/
|
||||||
|
@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
class GetClosestRowBeforeTracker {
|
class GetClosestRowBeforeTracker {
|
||||||
private final KeyValue targetkey;
|
private final KeyValue targetkey;
|
||||||
// Any cell w/ a ts older than this is expired.
|
// Any cell w/ a ts older than this is expired.
|
||||||
private final long oldestts;
|
private final long now;
|
||||||
|
private final long oldestUnexpiredTs;
|
||||||
private Cell candidate = null;
|
private Cell candidate = null;
|
||||||
private final KVComparator kvcomparator;
|
private final KVComparator kvcomparator;
|
||||||
// Flag for whether we're doing getclosest on a metaregion.
|
// Flag for whether we're doing getclosest on a metaregion.
|
||||||
|
@ -75,19 +76,12 @@ class GetClosestRowBeforeTracker {
|
||||||
HConstants.DELIMITER) - this.rowoffset;
|
HConstants.DELIMITER) - this.rowoffset;
|
||||||
}
|
}
|
||||||
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
|
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
|
||||||
this.oldestts = System.currentTimeMillis() - ttl;
|
this.now = System.currentTimeMillis();
|
||||||
|
this.oldestUnexpiredTs = now - ttl;
|
||||||
this.kvcomparator = c;
|
this.kvcomparator = c;
|
||||||
this.deletes = new TreeMap<Cell, NavigableSet<Cell>>(new CellComparator.RowComparator());
|
this.deletes = new TreeMap<Cell, NavigableSet<Cell>>(new CellComparator.RowComparator());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param kv
|
|
||||||
* @return True if this <code>kv</code> is expired.
|
|
||||||
*/
|
|
||||||
boolean isExpired(final Cell kv) {
|
|
||||||
return HStore.isExpired(kv, this.oldestts);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Add the specified KeyValue to the list of deletes.
|
* Add the specified KeyValue to the list of deletes.
|
||||||
* @param kv
|
* @param kv
|
||||||
|
@ -172,6 +166,15 @@ class GetClosestRowBeforeTracker {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param cell
|
||||||
|
* @return true if the cell is expired
|
||||||
|
*/
|
||||||
|
public boolean isExpired(final Cell cell) {
|
||||||
|
return cell.getTimestamp() < this.oldestUnexpiredTs ||
|
||||||
|
HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Handle keys whose values hold deletes.
|
* Handle keys whose values hold deletes.
|
||||||
* Add to the set of deletes and then if the candidate keys contain any that
|
* Add to the set of deletes and then if the candidate keys contain any that
|
||||||
|
|
|
@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
import org.apache.hadoop.hbase.backup.HFileArchiver;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
|
@ -2642,6 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
|
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
|
||||||
noOfDeletes++;
|
noOfDeletes++;
|
||||||
}
|
}
|
||||||
|
rewriteCellTags(familyMaps[i], mutation);
|
||||||
}
|
}
|
||||||
|
|
||||||
lock(this.updatesLock.readLock(), numReadyToWrite);
|
lock(this.updatesLock.readLock(), numReadyToWrite);
|
||||||
|
@ -3116,6 +3119,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Possibly rewrite incoming cell tags.
|
||||||
|
*/
|
||||||
|
void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
|
||||||
|
// Check if we have any work to do and early out otherwise
|
||||||
|
// Update these checks as more logic is added here
|
||||||
|
|
||||||
|
if (m.getTTL() == Long.MAX_VALUE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// From this point we know we have some work to do
|
||||||
|
|
||||||
|
for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
|
||||||
|
List<Cell> cells = e.getValue();
|
||||||
|
assert cells instanceof RandomAccess;
|
||||||
|
int listSize = cells.size();
|
||||||
|
for (int i = 0; i < listSize; i++) {
|
||||||
|
Cell cell = cells.get(i);
|
||||||
|
List<Tag> newTags = new ArrayList<Tag>();
|
||||||
|
Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
|
||||||
|
cell.getTagsOffset(), cell.getTagsLength());
|
||||||
|
|
||||||
|
// Carry forward existing tags
|
||||||
|
|
||||||
|
while (tagIterator.hasNext()) {
|
||||||
|
|
||||||
|
// Add any filters or tag specific rewrites here
|
||||||
|
|
||||||
|
newTags.add(tagIterator.next());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cell TTL handling
|
||||||
|
|
||||||
|
// Check again if we need to add a cell TTL because early out logic
|
||||||
|
// above may change when there are more tag based features in core.
|
||||||
|
if (m.getTTL() != Long.MAX_VALUE) {
|
||||||
|
// Add a cell TTL tag
|
||||||
|
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rewrite the cell with the updated set of tags
|
||||||
|
|
||||||
|
cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
|
||||||
|
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
|
||||||
|
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
|
||||||
|
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
|
||||||
|
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
|
||||||
|
newTags));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check if resources to support an update.
|
* Check if resources to support an update.
|
||||||
*
|
*
|
||||||
|
@ -5213,6 +5269,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
processor.preBatchMutate(this, walEdit);
|
processor.preBatchMutate(this, walEdit);
|
||||||
// 7. Apply to memstore
|
// 7. Apply to memstore
|
||||||
for (Mutation m : mutations) {
|
for (Mutation m : mutations) {
|
||||||
|
// Handle any tag based cell features
|
||||||
|
rewriteCellTags(m.getFamilyCellMap(), m);
|
||||||
|
|
||||||
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
|
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
|
||||||
Cell cell = cellScanner.current();
|
Cell cell = cellScanner.current();
|
||||||
CellUtil.setSequenceId(cell, mvccNum);
|
CellUtil.setSequenceId(cell, mvccNum);
|
||||||
|
@ -5351,8 +5410,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: There's a lot of boiler plate code identical
|
// TODO: There's a lot of boiler plate code identical to increment.
|
||||||
// to increment... See how to better unify that.
|
// We should refactor append and increment as local get-mutate-put
|
||||||
|
// transactions, so all stores only go through one code path for puts.
|
||||||
/**
|
/**
|
||||||
* Perform one or more append operations on a row.
|
* Perform one or more append operations on a row.
|
||||||
*
|
*
|
||||||
|
@ -5422,8 +5482,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
// Iterate the input columns and update existing values if they were
|
// Iterate the input columns and update existing values if they were
|
||||||
// found, otherwise add new column initialized to the append value
|
// found, otherwise add new column initialized to the append value
|
||||||
|
|
||||||
// Avoid as much copying as possible. Every byte is copied at most
|
// Avoid as much copying as possible. We may need to rewrite and
|
||||||
// once.
|
// consolidate tags. Bytes are only copied once.
|
||||||
// Would be nice if KeyValue had scatter/gather logic
|
// Would be nice if KeyValue had scatter/gather logic
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
for (Cell cell : family.getValue()) {
|
for (Cell cell : family.getValue()) {
|
||||||
|
@ -5433,40 +5493,87 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
|
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
|
||||||
oldCell = results.get(idx);
|
oldCell = results.get(idx);
|
||||||
long ts = Math.max(now, oldCell.getTimestamp());
|
long ts = Math.max(now, oldCell.getTimestamp());
|
||||||
// allocate an empty kv once
|
|
||||||
|
// Process cell tags
|
||||||
|
List<Tag> newTags = new ArrayList<Tag>();
|
||||||
|
|
||||||
|
// Make a union of the set of tags in the old and new KVs
|
||||||
|
|
||||||
|
if (oldCell.getTagsLength() > 0) {
|
||||||
|
Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
|
||||||
|
oldCell.getTagsOffset(), oldCell.getTagsLength());
|
||||||
|
while (i.hasNext()) {
|
||||||
|
newTags.add(i.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (cell.getTagsLength() > 0) {
|
||||||
|
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
|
||||||
|
cell.getTagsLength());
|
||||||
|
while (i.hasNext()) {
|
||||||
|
newTags.add(i.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cell TTL handling
|
||||||
|
|
||||||
|
if (append.getTTL() != Long.MAX_VALUE) {
|
||||||
|
// Add the new TTL tag
|
||||||
|
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild tags
|
||||||
|
byte[] tagBytes = Tag.fromList(newTags);
|
||||||
|
|
||||||
|
// allocate an empty cell once
|
||||||
newCell = new KeyValue(row.length, cell.getFamilyLength(),
|
newCell = new KeyValue(row.length, cell.getFamilyLength(),
|
||||||
cell.getQualifierLength(), ts, KeyValue.Type.Put,
|
cell.getQualifierLength(), ts, KeyValue.Type.Put,
|
||||||
oldCell.getValueLength() + cell.getValueLength(),
|
oldCell.getValueLength() + cell.getValueLength(),
|
||||||
oldCell.getTagsLength() + cell.getTagsLength());
|
tagBytes.length);
|
||||||
// copy in the value
|
|
||||||
System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
|
|
||||||
newCell.getValueArray(), newCell.getValueOffset(),
|
|
||||||
oldCell.getValueLength());
|
|
||||||
System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
|
|
||||||
newCell.getValueArray(),
|
|
||||||
newCell.getValueOffset() + oldCell.getValueLength(),
|
|
||||||
cell.getValueLength());
|
|
||||||
// copy in the tags
|
|
||||||
System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
|
|
||||||
newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
|
|
||||||
System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
|
|
||||||
newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
|
|
||||||
// copy in row, family, and qualifier
|
// copy in row, family, and qualifier
|
||||||
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
|
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
|
||||||
newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
|
newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
|
||||||
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
|
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
|
||||||
newCell.getFamilyArray(), newCell.getFamilyOffset(),
|
newCell.getFamilyArray(), newCell.getFamilyOffset(),
|
||||||
cell.getFamilyLength());
|
cell.getFamilyLength());
|
||||||
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
|
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||||
newCell.getQualifierArray(), newCell.getQualifierOffset(),
|
newCell.getQualifierArray(), newCell.getQualifierOffset(),
|
||||||
cell.getQualifierLength());
|
cell.getQualifierLength());
|
||||||
|
// copy in the value
|
||||||
|
System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
|
||||||
|
newCell.getValueArray(), newCell.getValueOffset(),
|
||||||
|
oldCell.getValueLength());
|
||||||
|
System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
|
||||||
|
newCell.getValueArray(),
|
||||||
|
newCell.getValueOffset() + oldCell.getValueLength(),
|
||||||
|
cell.getValueLength());
|
||||||
|
// Copy in tag data
|
||||||
|
System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
|
||||||
|
tagBytes.length);
|
||||||
idx++;
|
idx++;
|
||||||
} else {
|
} else {
|
||||||
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
|
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
|
||||||
// so only need to update the timestamp to 'now'
|
|
||||||
CellUtil.updateLatestStamp(cell, now);
|
CellUtil.updateLatestStamp(cell, now);
|
||||||
newCell = cell;
|
|
||||||
}
|
// Cell TTL handling
|
||||||
|
|
||||||
|
if (append.getTTL() != Long.MAX_VALUE) {
|
||||||
|
List<Tag> newTags = new ArrayList<Tag>(1);
|
||||||
|
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
|
||||||
|
// Add the new TTL tag
|
||||||
|
newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
|
||||||
|
cell.getRowLength(),
|
||||||
|
cell.getFamilyArray(), cell.getFamilyOffset(),
|
||||||
|
cell.getFamilyLength(),
|
||||||
|
cell.getQualifierArray(), cell.getQualifierOffset(),
|
||||||
|
cell.getQualifierLength(),
|
||||||
|
cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
|
||||||
|
cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
|
||||||
|
newTags);
|
||||||
|
} else {
|
||||||
|
newCell = cell;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
CellUtil.setSequenceId(newCell, mvccNum);
|
CellUtil.setSequenceId(newCell, mvccNum);
|
||||||
// Give coprocessors a chance to update the new cell
|
// Give coprocessors a chance to update the new cell
|
||||||
if (coprocessorHost != null) {
|
if (coprocessorHost != null) {
|
||||||
|
@ -5570,6 +5677,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: There's a lot of boiler plate code identical to append.
|
||||||
|
// We should refactor append and increment as local get-mutate-put
|
||||||
|
// transactions, so all stores only go through one code path for puts.
|
||||||
/**
|
/**
|
||||||
* Perform one or more increment operations on a row.
|
* Perform one or more increment operations on a row.
|
||||||
* @return new keyvalues after increment
|
* @return new keyvalues after increment
|
||||||
|
@ -5642,13 +5752,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
// Iterate the input columns and update existing values if they were
|
// Iterate the input columns and update existing values if they were
|
||||||
// found, otherwise add new column initialized to the increment amount
|
// found, otherwise add new column initialized to the increment amount
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
for (Cell kv: family.getValue()) {
|
for (Cell cell: family.getValue()) {
|
||||||
long amount = Bytes.toLong(CellUtil.cloneValue(kv));
|
long amount = Bytes.toLong(CellUtil.cloneValue(cell));
|
||||||
boolean noWriteBack = (amount == 0);
|
boolean noWriteBack = (amount == 0);
|
||||||
|
List<Tag> newTags = new ArrayList<Tag>();
|
||||||
|
|
||||||
|
// Carry forward any tags that might have been added by a coprocessor
|
||||||
|
if (cell.getTagsLength() > 0) {
|
||||||
|
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
|
||||||
|
cell.getTagsOffset(), cell.getTagsLength());
|
||||||
|
while (i.hasNext()) {
|
||||||
|
newTags.add(i.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Cell c = null;
|
Cell c = null;
|
||||||
long ts = now;
|
long ts = now;
|
||||||
if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
|
if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
|
||||||
c = results.get(idx);
|
c = results.get(idx);
|
||||||
ts = Math.max(now, c.getTimestamp());
|
ts = Math.max(now, c.getTimestamp());
|
||||||
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
|
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
|
||||||
|
@ -5658,32 +5778,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
|
||||||
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
|
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
|
||||||
"Attempted to increment field that isn't 64 bits wide");
|
"Attempted to increment field that isn't 64 bits wide");
|
||||||
}
|
}
|
||||||
|
// Carry tags forward from previous version
|
||||||
|
if (c.getTagsLength() > 0) {
|
||||||
|
Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
|
||||||
|
c.getTagsOffset(), c.getTagsLength());
|
||||||
|
while (i.hasNext()) {
|
||||||
|
newTags.add(i.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
idx++;
|
idx++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append new incremented KeyValue to list
|
// Append new incremented KeyValue to list
|
||||||
byte[] q = CellUtil.cloneQualifier(kv);
|
byte[] q = CellUtil.cloneQualifier(cell);
|
||||||
byte[] val = Bytes.toBytes(amount);
|
byte[] val = Bytes.toBytes(amount);
|
||||||
int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
|
|
||||||
int incCellTagsLen = kv.getTagsLength();
|
// Add the TTL tag if the mutation carried one
|
||||||
Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
|
if (increment.getTTL() != Long.MAX_VALUE) {
|
||||||
KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
|
newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
|
||||||
System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
|
|
||||||
System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
|
|
||||||
family.getKey().length);
|
|
||||||
System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
|
|
||||||
// copy in the value
|
|
||||||
System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
|
|
||||||
// copy tags
|
|
||||||
if (oldCellTagsLen > 0) {
|
|
||||||
System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
|
|
||||||
newKV.getTagsOffset(), oldCellTagsLen);
|
|
||||||
}
|
|
||||||
if (incCellTagsLen > 0) {
|
|
||||||
System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
|
|
||||||
newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Cell newKV = new KeyValue(row, 0, row.length,
|
||||||
|
family.getKey(), 0, family.getKey().length,
|
||||||
|
q, 0, q.length,
|
||||||
|
ts,
|
||||||
|
KeyValue.Type.Put,
|
||||||
|
val, 0, val.length,
|
||||||
|
newTags);
|
||||||
|
|
||||||
CellUtil.setSequenceId(newKV, mvccNum);
|
CellUtil.setSequenceId(newKV, mvccNum);
|
||||||
|
|
||||||
// Give coprocessors a chance to update the new cell
|
// Give coprocessors a chance to update the new cell
|
||||||
if (coprocessorHost != null) {
|
if (coprocessorHost != null) {
|
||||||
newKV = coprocessorHost.postMutationBeforeWAL(
|
newKV = coprocessorHost.postMutationBeforeWAL(
|
||||||
|
|
|
@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
import org.apache.hadoop.hbase.conf.ConfigurationManager;
|
||||||
|
@ -1674,8 +1676,38 @@ public class HStore implements Store {
|
||||||
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
|
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isExpired(final Cell key, final long oldestTimestamp) {
|
/**
|
||||||
return key.getTimestamp() < oldestTimestamp;
|
* @param cell
|
||||||
|
* @param oldestTimestamp
|
||||||
|
* @return true if the cell is expired
|
||||||
|
*/
|
||||||
|
static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
|
||||||
|
// Do not create an Iterator or Tag objects unless the cell actually has
|
||||||
|
// tags
|
||||||
|
if (cell.getTagsLength() > 0) {
|
||||||
|
// Look for a TTL tag first. Use it instead of the family setting if
|
||||||
|
// found. If a cell has multiple TTLs, resolve the conflict by using the
|
||||||
|
// first tag encountered.
|
||||||
|
Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
|
||||||
|
cell.getTagsLength());
|
||||||
|
while (i.hasNext()) {
|
||||||
|
Tag t = i.next();
|
||||||
|
if (TagType.TTL_TAG_TYPE == t.getType()) {
|
||||||
|
// Unlike in schema cell TTLs are stored in milliseconds, no need
|
||||||
|
// to convert
|
||||||
|
long ts = cell.getTimestamp();
|
||||||
|
assert t.getTagLength() == Bytes.SIZEOF_LONG;
|
||||||
|
long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
|
||||||
|
if (ts + ttl < now) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
// Per cell TTLs cannot extend lifetime beyond family settings, so
|
||||||
|
// fall through to check that
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -102,6 +102,10 @@ public class ScanQueryMatcher {
|
||||||
private final long earliestPutTs;
|
private final long earliestPutTs;
|
||||||
private final long ttl;
|
private final long ttl;
|
||||||
|
|
||||||
|
/** The oldest timestamp we are interested in, based on TTL */
|
||||||
|
private final long oldestUnexpiredTS;
|
||||||
|
private final long now;
|
||||||
|
|
||||||
/** readPoint over which the KVs are unconditionally included */
|
/** readPoint over which the KVs are unconditionally included */
|
||||||
protected long maxReadPointToTrackVersions;
|
protected long maxReadPointToTrackVersions;
|
||||||
|
|
||||||
|
@ -154,7 +158,7 @@ public class ScanQueryMatcher {
|
||||||
*/
|
*/
|
||||||
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
|
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
|
||||||
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
|
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
|
||||||
RegionCoprocessorHost regionCoprocessorHost) throws IOException {
|
long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
|
||||||
this.tr = scan.getTimeRange();
|
this.tr = scan.getTimeRange();
|
||||||
this.rowComparator = scanInfo.getComparator();
|
this.rowComparator = scanInfo.getComparator();
|
||||||
this.regionCoprocessorHost = regionCoprocessorHost;
|
this.regionCoprocessorHost = regionCoprocessorHost;
|
||||||
|
@ -164,6 +168,9 @@ public class ScanQueryMatcher {
|
||||||
scanInfo.getFamily());
|
scanInfo.getFamily());
|
||||||
this.filter = scan.getFilter();
|
this.filter = scan.getFilter();
|
||||||
this.earliestPutTs = earliestPutTs;
|
this.earliestPutTs = earliestPutTs;
|
||||||
|
this.oldestUnexpiredTS = oldestUnexpiredTS;
|
||||||
|
this.now = now;
|
||||||
|
|
||||||
this.maxReadPointToTrackVersions = readPointToUse;
|
this.maxReadPointToTrackVersions = readPointToUse;
|
||||||
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
|
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
|
||||||
this.ttl = oldestUnexpiredTS;
|
this.ttl = oldestUnexpiredTS;
|
||||||
|
@ -218,18 +225,18 @@ public class ScanQueryMatcher {
|
||||||
* @param scanInfo The store's immutable scan info
|
* @param scanInfo The store's immutable scan info
|
||||||
* @param columns
|
* @param columns
|
||||||
* @param earliestPutTs Earliest put seen in any of the store files.
|
* @param earliestPutTs Earliest put seen in any of the store files.
|
||||||
* @param oldestUnexpiredTS the oldest timestamp we are interested in,
|
* @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
|
||||||
* based on TTL
|
* @param now the current server time
|
||||||
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
|
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
|
||||||
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
|
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
|
||||||
* @param regionCoprocessorHost
|
* @param regionCoprocessorHost
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
|
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
|
||||||
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
|
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
|
||||||
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
|
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
|
||||||
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
|
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
|
||||||
oldestUnexpiredTS, regionCoprocessorHost);
|
oldestUnexpiredTS, now, regionCoprocessorHost);
|
||||||
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
|
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
|
||||||
this.dropDeletesFromRow = dropDeletesFromRow;
|
this.dropDeletesFromRow = dropDeletesFromRow;
|
||||||
this.dropDeletesToRow = dropDeletesToRow;
|
this.dropDeletesToRow = dropDeletesToRow;
|
||||||
|
@ -239,10 +246,10 @@ public class ScanQueryMatcher {
|
||||||
* Constructor for tests
|
* Constructor for tests
|
||||||
*/
|
*/
|
||||||
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
|
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
|
||||||
NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
|
NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
|
||||||
this(scan, scanInfo, columns, ScanType.USER_SCAN,
|
this(scan, scanInfo, columns, ScanType.USER_SCAN,
|
||||||
Long.MAX_VALUE, /* max Readpoint to track versions */
|
Long.MAX_VALUE, /* max Readpoint to track versions */
|
||||||
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
|
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -300,12 +307,17 @@ public class ScanQueryMatcher {
|
||||||
|
|
||||||
int qualifierOffset = cell.getQualifierOffset();
|
int qualifierOffset = cell.getQualifierOffset();
|
||||||
int qualifierLength = cell.getQualifierLength();
|
int qualifierLength = cell.getQualifierLength();
|
||||||
|
|
||||||
long timestamp = cell.getTimestamp();
|
long timestamp = cell.getTimestamp();
|
||||||
// check for early out based on timestamp alone
|
// check for early out based on timestamp alone
|
||||||
if (columns.isDone(timestamp)) {
|
if (columns.isDone(timestamp)) {
|
||||||
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
|
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
|
||||||
qualifierLength);
|
qualifierLength);
|
||||||
}
|
}
|
||||||
|
// check if the cell is expired by cell TTL
|
||||||
|
if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
|
||||||
|
return MatchCode.SKIP;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The delete logic is pretty complicated now.
|
* The delete logic is pretty complicated now.
|
||||||
|
|
|
@ -77,6 +77,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
protected final Scan scan;
|
protected final Scan scan;
|
||||||
protected final NavigableSet<byte[]> columns;
|
protected final NavigableSet<byte[]> columns;
|
||||||
protected final long oldestUnexpiredTS;
|
protected final long oldestUnexpiredTS;
|
||||||
|
protected final long now;
|
||||||
protected final int minVersions;
|
protected final int minVersions;
|
||||||
protected final long maxRowSize;
|
protected final long maxRowSize;
|
||||||
|
|
||||||
|
@ -123,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
explicitColumnQuery = numCol > 0;
|
explicitColumnQuery = numCol > 0;
|
||||||
this.scan = scan;
|
this.scan = scan;
|
||||||
this.columns = columns;
|
this.columns = columns;
|
||||||
oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
|
this.now = EnvironmentEdgeManager.currentTime();
|
||||||
|
this.oldestUnexpiredTS = now - ttl;
|
||||||
this.minVersions = minVersions;
|
this.minVersions = minVersions;
|
||||||
|
|
||||||
if (store != null && ((HStore)store).getHRegion() != null
|
if (store != null && ((HStore)store).getHRegion() != null
|
||||||
|
@ -176,7 +178,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
}
|
}
|
||||||
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
|
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
|
||||||
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
|
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
|
||||||
oldestUnexpiredTS, store.getCoprocessorHost());
|
oldestUnexpiredTS, now, store.getCoprocessorHost());
|
||||||
|
|
||||||
this.store.addChangedReaderObserver(this);
|
this.store.addChangedReaderObserver(this);
|
||||||
|
|
||||||
|
@ -241,10 +243,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
|
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
|
||||||
if (dropDeletesFromRow == null) {
|
if (dropDeletesFromRow == null) {
|
||||||
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
|
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
|
||||||
earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
|
earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
|
||||||
} else {
|
} else {
|
||||||
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
|
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
|
||||||
oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
|
oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
|
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
|
||||||
|
@ -284,7 +286,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
|
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
|
||||||
scanInfo.getMinVersions(), readPt);
|
scanInfo.getMinVersions(), readPt);
|
||||||
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
|
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
|
||||||
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
|
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
|
||||||
|
|
||||||
// In unit tests, the store could be null
|
// In unit tests, the store could be null
|
||||||
if (this.store != null) {
|
if (this.store != null) {
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.util.Tool;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({MapReduceTests.class, LargeTests.class})
|
||||||
|
public class TestImportTSVWithTTLs implements Configurable {
|
||||||
|
|
||||||
|
protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
|
||||||
|
protected static final String NAME = TestImportTsv.class.getSimpleName();
|
||||||
|
protected static HBaseTestingUtility util = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete the tmp directory after running doMROnTableTest. Boolean. Default is
|
||||||
|
* false.
|
||||||
|
*/
|
||||||
|
protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Force use of combiner in doMROnTableTest. Boolean. Default is true.
|
||||||
|
*/
|
||||||
|
protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
|
||||||
|
|
||||||
|
private final String FAMILY = "FAM";
|
||||||
|
private static Configuration conf;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Configuration getConf() {
|
||||||
|
return util.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setConf(Configuration conf) {
|
||||||
|
throw new IllegalArgumentException("setConf not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void provisionCluster() throws Exception {
|
||||||
|
conf = util.getConfiguration();
|
||||||
|
// We don't check persistence in HFiles in this test, but if we ever do we will
|
||||||
|
// need this where the default hfile version is not 3 (i.e. 0.98)
|
||||||
|
conf.setInt("hfile.format.version", 3);
|
||||||
|
conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
|
||||||
|
util.startMiniCluster();
|
||||||
|
util.startMiniMapReduceCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void releaseCluster() throws Exception {
|
||||||
|
util.shutdownMiniMapReduceCluster();
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMROnTable() throws Exception {
|
||||||
|
String tableName = "test-" + UUID.randomUUID();
|
||||||
|
|
||||||
|
// Prepare the arguments required for the test.
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + ImportTsv.MAPPER_CONF_KEY
|
||||||
|
+ "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
|
||||||
|
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
|
||||||
|
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
|
||||||
|
String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
|
||||||
|
util.createTable(TableName.valueOf(tableName), FAMILY);
|
||||||
|
doMROnTableTest(util, FAMILY, data, args, 1);
|
||||||
|
util.deleteTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
|
||||||
|
String[] args, int valueMultiplier) throws Exception {
|
||||||
|
TableName table = TableName.valueOf(args[args.length - 1]);
|
||||||
|
Configuration conf = new Configuration(util.getConfiguration());
|
||||||
|
|
||||||
|
// populate input file
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Path inputPath = fs.makeQualified(new Path(util
|
||||||
|
.getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
|
||||||
|
FSDataOutputStream op = fs.create(inputPath, true);
|
||||||
|
op.write(Bytes.toBytes(data));
|
||||||
|
op.close();
|
||||||
|
LOG.debug(String.format("Wrote test data to file: %s", inputPath));
|
||||||
|
|
||||||
|
if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
|
||||||
|
LOG.debug("Forcing combiner.");
|
||||||
|
conf.setInt("mapreduce.map.combine.minspills", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the import
|
||||||
|
List<String> argv = new ArrayList<String>(Arrays.asList(args));
|
||||||
|
argv.add(inputPath.toString());
|
||||||
|
Tool tool = new ImportTsv();
|
||||||
|
LOG.debug("Running ImportTsv with arguments: " + argv);
|
||||||
|
try {
|
||||||
|
// Job will fail if observer rejects entries without TTL
|
||||||
|
assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
|
||||||
|
} finally {
|
||||||
|
// Clean up
|
||||||
|
if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
|
||||||
|
LOG.debug("Deleting test subdirectory");
|
||||||
|
util.cleanupDataTestDirOnTestFS(table.getNameAsString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tool;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TTLCheckingObserver extends BaseRegionObserver {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
|
||||||
|
Durability durability) throws IOException {
|
||||||
|
HRegion region = e.getEnvironment().getRegion();
|
||||||
|
if (!region.getRegionInfo().isMetaTable()
|
||||||
|
&& !region.getRegionInfo().getTable().isSystemTable()) {
|
||||||
|
// The put carries the TTL attribute
|
||||||
|
if (put.getTTL() != Long.MAX_VALUE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
throw new IOException("Operation does not have TTL set");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||||
|
import org.apache.hadoop.hbase.Tag;
|
||||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TagType;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
@ -111,6 +113,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
|
||||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||||
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
|
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
|
||||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||||
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
|
@ -140,6 +143,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||||
|
@ -5767,6 +5771,133 @@ public class TestHRegion {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCellTTLs() throws IOException {
|
||||||
|
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
|
||||||
|
EnvironmentEdgeManager.injectEdge(edge);
|
||||||
|
|
||||||
|
final byte[] row = Bytes.toBytes("testRow");
|
||||||
|
final byte[] q1 = Bytes.toBytes("q1");
|
||||||
|
final byte[] q2 = Bytes.toBytes("q2");
|
||||||
|
final byte[] q3 = Bytes.toBytes("q3");
|
||||||
|
final byte[] q4 = Bytes.toBytes("q4");
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
|
||||||
|
HColumnDescriptor hcd = new HColumnDescriptor(fam1);
|
||||||
|
hcd.setTimeToLive(10); // 10 seconds
|
||||||
|
htd.addFamily(hcd);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
|
||||||
|
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
|
||||||
|
|
||||||
|
HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
|
||||||
|
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
|
||||||
|
TEST_UTIL.getDataTestDir(), conf, htd);
|
||||||
|
assertNotNull(region);
|
||||||
|
try {
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
// Add a cell that will expire in 5 seconds via cell TTL
|
||||||
|
region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
|
||||||
|
HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
|
||||||
|
// TTL tags specify ts in milliseconds
|
||||||
|
new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
|
||||||
|
// Add a cell that will expire after 10 seconds via family setting
|
||||||
|
region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
// Add a cell that will expire in 15 seconds via cell TTL
|
||||||
|
region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
|
||||||
|
HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
|
||||||
|
// TTL tags specify ts in milliseconds
|
||||||
|
new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
|
||||||
|
// Add a cell that will expire in 20 seconds via family setting
|
||||||
|
region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
|
||||||
|
// Flush so we are sure store scanning gets this right
|
||||||
|
region.flushcache();
|
||||||
|
|
||||||
|
// A query at time T+0 should return all cells
|
||||||
|
Result r = region.get(new Get(row));
|
||||||
|
assertNotNull(r.getValue(fam1, q1));
|
||||||
|
assertNotNull(r.getValue(fam1, q2));
|
||||||
|
assertNotNull(r.getValue(fam1, q3));
|
||||||
|
assertNotNull(r.getValue(fam1, q4));
|
||||||
|
|
||||||
|
// Increment time to T+5 seconds
|
||||||
|
edge.incrementTime(5000);
|
||||||
|
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
assertNull(r.getValue(fam1, q1));
|
||||||
|
assertNotNull(r.getValue(fam1, q2));
|
||||||
|
assertNotNull(r.getValue(fam1, q3));
|
||||||
|
assertNotNull(r.getValue(fam1, q4));
|
||||||
|
|
||||||
|
// Increment time to T+10 seconds
|
||||||
|
edge.incrementTime(5000);
|
||||||
|
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
assertNull(r.getValue(fam1, q1));
|
||||||
|
assertNull(r.getValue(fam1, q2));
|
||||||
|
assertNotNull(r.getValue(fam1, q3));
|
||||||
|
assertNotNull(r.getValue(fam1, q4));
|
||||||
|
|
||||||
|
// Increment time to T+15 seconds
|
||||||
|
edge.incrementTime(5000);
|
||||||
|
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
assertNull(r.getValue(fam1, q1));
|
||||||
|
assertNull(r.getValue(fam1, q2));
|
||||||
|
assertNull(r.getValue(fam1, q3));
|
||||||
|
assertNotNull(r.getValue(fam1, q4));
|
||||||
|
|
||||||
|
// Increment time to T+20 seconds
|
||||||
|
edge.incrementTime(10000);
|
||||||
|
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
assertNull(r.getValue(fam1, q1));
|
||||||
|
assertNull(r.getValue(fam1, q2));
|
||||||
|
assertNull(r.getValue(fam1, q3));
|
||||||
|
assertNull(r.getValue(fam1, q4));
|
||||||
|
|
||||||
|
// Fun with disappearing increments
|
||||||
|
|
||||||
|
// Start at 1
|
||||||
|
region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
byte[] val = r.getValue(fam1, q1);
|
||||||
|
assertNotNull(val);
|
||||||
|
assertEquals(Bytes.toLong(val), 1L);
|
||||||
|
|
||||||
|
// Increment with a TTL of 5 seconds
|
||||||
|
Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
|
||||||
|
incr.setTTL(5000);
|
||||||
|
region.increment(incr); // 2
|
||||||
|
|
||||||
|
// New value should be 2
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
val = r.getValue(fam1, q1);
|
||||||
|
assertNotNull(val);
|
||||||
|
assertEquals(Bytes.toLong(val), 2L);
|
||||||
|
|
||||||
|
// Increment time to T+25 seconds
|
||||||
|
edge.incrementTime(5000);
|
||||||
|
|
||||||
|
// Value should be back to 1
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
val = r.getValue(fam1, q1);
|
||||||
|
assertNotNull(val);
|
||||||
|
assertEquals(Bytes.toLong(val), 1L);
|
||||||
|
|
||||||
|
// Increment time to T+30 seconds
|
||||||
|
edge.incrementTime(5000);
|
||||||
|
|
||||||
|
// Original value written at T+20 should be gone now via family TTL
|
||||||
|
r = region.get(new Get(row));
|
||||||
|
assertNull(r.getValue(fam1, q1));
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
HRegion.closeHRegion(region);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static HRegion initHRegion(byte[] tableName, String callingMethod,
|
private static HRegion initHRegion(byte[] tableName, String callingMethod,
|
||||||
byte[]... families) throws IOException {
|
byte[]... families) throws IOException {
|
||||||
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
|
||||||
|
|
|
@ -95,11 +95,12 @@ public class TestQueryMatcher extends HBaseTestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
|
private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
|
||||||
// 2,4,5
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
// 2,4,5
|
||||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
||||||
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
|
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
|
||||||
EnvironmentEdgeManager.currentTime() - ttl);
|
now - ttl, now);
|
||||||
|
|
||||||
List<KeyValue> memstore = new ArrayList<KeyValue>();
|
List<KeyValue> memstore = new ArrayList<KeyValue>();
|
||||||
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
|
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
|
||||||
|
@ -182,9 +183,10 @@ public class TestQueryMatcher extends HBaseTestCase {
|
||||||
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
|
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
|
||||||
expected.add(ScanQueryMatcher.MatchCode.DONE);
|
expected.add(ScanQueryMatcher.MatchCode.DONE);
|
||||||
|
|
||||||
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
||||||
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
|
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
|
||||||
EnvironmentEdgeManager.currentTime() - ttl);
|
now - ttl, now);
|
||||||
|
|
||||||
List<KeyValue> memstore = new ArrayList<KeyValue>();
|
List<KeyValue> memstore = new ArrayList<KeyValue>();
|
||||||
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
|
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
|
||||||
|
@ -238,8 +240,8 @@ public class TestQueryMatcher extends HBaseTestCase {
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
ScanQueryMatcher qm =
|
ScanQueryMatcher qm =
|
||||||
new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
|
new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
|
||||||
rowComparator), get.getFamilyMap().get(fam2), now - testTTL);
|
rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
|
||||||
|
|
||||||
KeyValue [] kvs = new KeyValue[] {
|
KeyValue [] kvs = new KeyValue[] {
|
||||||
new KeyValue(row1, fam2, col1, now-100, data),
|
new KeyValue(row1, fam2, col1, now-100, data),
|
||||||
|
@ -294,7 +296,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
|
||||||
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
|
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
|
||||||
now - testTTL);
|
now - testTTL, now);
|
||||||
|
|
||||||
KeyValue [] kvs = new KeyValue[] {
|
KeyValue [] kvs = new KeyValue[] {
|
||||||
new KeyValue(row1, fam2, col1, now-100, data),
|
new KeyValue(row1, fam2, col1, now-100, data),
|
||||||
|
@ -353,7 +355,7 @@ public class TestQueryMatcher extends HBaseTestCase {
|
||||||
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
|
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
|
||||||
|
|
||||||
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
|
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
|
||||||
HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
|
HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
|
||||||
List<ScanQueryMatcher.MatchCode> actual =
|
List<ScanQueryMatcher.MatchCode> actual =
|
||||||
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
|
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
|
||||||
byte[] prevRow = null;
|
byte[] prevRow = null;
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({RegionServerTests.class, SmallTests.class})
|
@Category({RegionServerTests.class, SmallTests.class})
|
||||||
|
@ -36,7 +35,6 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
|
||||||
|
|
||||||
final static int VERSIONS = 2;
|
final static int VERSIONS = 2;
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCheckColumn_Ok() throws IOException {
|
public void testCheckColumn_Ok() throws IOException {
|
||||||
ScanWildcardColumnTracker tracker =
|
ScanWildcardColumnTracker tracker =
|
||||||
new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
||||||
|
@ -70,7 +68,6 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testCheckColumn_EnforceVersions() throws IOException {
|
public void testCheckColumn_EnforceVersions() throws IOException {
|
||||||
ScanWildcardColumnTracker tracker =
|
ScanWildcardColumnTracker tracker =
|
||||||
new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
|
||||||
|
|
|
@ -419,8 +419,13 @@ public class TestTags {
|
||||||
tags = TestCoprocessorForTags.tags;
|
tags = TestCoprocessorForTags.tags;
|
||||||
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
|
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
|
||||||
assertEquals(2, tags.size());
|
assertEquals(2, tags.size());
|
||||||
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
|
// We cannot assume the ordering of tags
|
||||||
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
|
List<String> tagValues = new ArrayList<String>();
|
||||||
|
for (Tag tag: tags) {
|
||||||
|
tagValues.add(Bytes.toString(tag.getValue()));
|
||||||
|
}
|
||||||
|
assertTrue(tagValues.contains("tag1"));
|
||||||
|
assertTrue(tagValues.contains("tag2"));
|
||||||
TestCoprocessorForTags.checkTagPresence = false;
|
TestCoprocessorForTags.checkTagPresence = false;
|
||||||
TestCoprocessorForTags.tags = null;
|
TestCoprocessorForTags.tags = null;
|
||||||
|
|
||||||
|
@ -476,8 +481,13 @@ public class TestTags {
|
||||||
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
|
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
|
||||||
tags = TestCoprocessorForTags.tags;
|
tags = TestCoprocessorForTags.tags;
|
||||||
assertEquals(2, tags.size());
|
assertEquals(2, tags.size());
|
||||||
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
|
// We cannot assume the ordering of tags
|
||||||
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
|
tagValues.clear();
|
||||||
|
for (Tag tag: tags) {
|
||||||
|
tagValues.add(Bytes.toString(tag.getValue()));
|
||||||
|
}
|
||||||
|
assertTrue(tagValues.contains("tag1"));
|
||||||
|
assertTrue(tagValues.contains("tag2"));
|
||||||
TestCoprocessorForTags.checkTagPresence = false;
|
TestCoprocessorForTags.checkTagPresence = false;
|
||||||
TestCoprocessorForTags.tags = null;
|
TestCoprocessorForTags.tags = null;
|
||||||
|
|
||||||
|
|
|
@ -141,17 +141,19 @@ EOF
|
||||||
set_attributes(p, attributes) if attributes
|
set_attributes(p, attributes) if attributes
|
||||||
visibility = args[VISIBILITY]
|
visibility = args[VISIBILITY]
|
||||||
set_cell_visibility(p, visibility) if visibility
|
set_cell_visibility(p, visibility) if visibility
|
||||||
|
ttl = args[TTL]
|
||||||
|
set_op_ttl(p, ttl) if ttl
|
||||||
end
|
end
|
||||||
#Case where attributes are specified without timestamp
|
#Case where attributes are specified without timestamp
|
||||||
if timestamp.kind_of?(Hash)
|
if timestamp.kind_of?(Hash)
|
||||||
timestamp.each do |k, v|
|
timestamp.each do |k, v|
|
||||||
if v.kind_of?(Hash)
|
if k == 'ATTRIBUTES'
|
||||||
set_attributes(p, v) if v
|
set_attributes(p, v)
|
||||||
end
|
elsif k == 'VISIBILITY'
|
||||||
if v.kind_of?(String)
|
set_cell_visibility(p, v)
|
||||||
set_cell_visibility(p, v) if v
|
elsif k == "TTL"
|
||||||
end
|
set_op_ttl(p, v)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
timestamp = nil
|
timestamp = nil
|
||||||
end
|
end
|
||||||
|
@ -219,6 +221,8 @@ EOF
|
||||||
visibility = args[VISIBILITY]
|
visibility = args[VISIBILITY]
|
||||||
set_attributes(incr, attributes) if attributes
|
set_attributes(incr, attributes) if attributes
|
||||||
set_cell_visibility(incr, visibility) if visibility
|
set_cell_visibility(incr, visibility) if visibility
|
||||||
|
ttl = args[TTL]
|
||||||
|
set_op_ttl(incr, ttl) if ttl
|
||||||
end
|
end
|
||||||
incr.addColumn(family, qualifier, value)
|
incr.addColumn(family, qualifier, value)
|
||||||
@table.increment(incr)
|
@table.increment(incr)
|
||||||
|
@ -237,6 +241,8 @@ EOF
|
||||||
visibility = args[VISIBILITY]
|
visibility = args[VISIBILITY]
|
||||||
set_attributes(append, attributes) if attributes
|
set_attributes(append, attributes) if attributes
|
||||||
set_cell_visibility(append, visibility) if visibility
|
set_cell_visibility(append, visibility) if visibility
|
||||||
|
ttl = args[TTL]
|
||||||
|
set_op_ttl(append, ttl) if ttl
|
||||||
end
|
end
|
||||||
append.add(family, qualifier, value.to_s.to_java_bytes)
|
append.add(family, qualifier, value.to_s.to_java_bytes)
|
||||||
@table.append(append)
|
@table.append(append)
|
||||||
|
@ -545,6 +551,10 @@ EOF
|
||||||
auths.to_java(:string)))
|
auths.to_java(:string)))
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def set_op_ttl(op, ttl)
|
||||||
|
op.setTTL(ttl.to_java(:long))
|
||||||
|
end
|
||||||
|
|
||||||
#----------------------------
|
#----------------------------
|
||||||
# Add general administration utilities to the shell
|
# Add general administration utilities to the shell
|
||||||
# each of the names below adds this method name to the table
|
# each of the names below adds this method name to the table
|
||||||
|
|
|
@ -530,5 +530,18 @@ module Hbase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test "mutation with TTL should expire" do
|
||||||
|
@test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
|
||||||
|
begin
|
||||||
|
res = @test_table._get_internal('ttlTest', 'x:a')
|
||||||
|
assert_not_nil(res)
|
||||||
|
sleep 2
|
||||||
|
res = @test_table._get_internal('ttlTest', 'x:a')
|
||||||
|
assert_nil(res)
|
||||||
|
ensure
|
||||||
|
@test_table.delete('ttlTest', 'x:a')
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue