HBASE-11805 KeyValue to Cell Convert in WALEdit APIs.

This commit is contained in:
anoopsjohn 2014-09-07 15:48:21 +05:30
parent fec7771f73
commit 0a9bfcaf74
31 changed files with 327 additions and 254 deletions

View File

@ -28,6 +28,7 @@ import java.util.NavigableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.ByteRange;
import org.apache.hadoop.hbase.util.Bytes;
@ -465,8 +466,23 @@ public final class CellUtil {
// Serialization is probably preceded by a length (it is in the KeyValueCodec at least).
Bytes.SIZEOF_INT;
}
/**
* This is an estimate of the heap space occupied by a cell. When the cell is of type
* {@link HeapSize} we call {@link HeapSize#heapSize()} so cell can give a correct value. In other
* cases we just consider the bytes occupied by the cell components ie. row, CF, qualifier,
* timestamp, type, value and tags.
* @param cell
* @return estimate of the heap space
*/
public static long estimatedHeapSizeOf(final Cell cell) {
if (cell instanceof HeapSize) {
return ((HeapSize) cell).heapSize();
}
return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength()
+ cell.getValueLength() + cell.getTagsLength() + KeyValue.TIMESTAMP_TYPE_SIZE;
}
/********************* tags *************************************/
/**
* Util method to iterate through the tags
@ -530,4 +546,18 @@ public final class CellUtil {
+ SettableSequenceId.class.getName()));
}
}
/**
* Estimation of total number of bytes used by the cell to store its key, value and tags. When the
* cell is a {@link KeyValue} we include the extra infrastructure size used by it.
* @param cell
* @return estimated length
*/
public static int estimatedLengthOf(final Cell cell) {
if (cell instanceof KeyValue) {
return ((KeyValue)cell).getLength();
}
return cell.getRowLength() + cell.getFamilyLength() + cell.getQualifierLength()
+ cell.getValueLength() + cell.getTagsLength() + KeyValue.TIMESTAMP_TYPE_SIZE;
}
}

View File

@ -28,9 +28,11 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
@ -82,7 +84,8 @@ public class WALPlayer extends Configured implements Tool {
try {
// skip all other tables
if (Bytes.equals(table, key.getTablename().getName())) {
for (KeyValue kv : value.getKeyValues()) {
for (Cell cell : value.getCells()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
context.write(new ImmutableBytesWritable(kv.getRow()), kv);
}
@ -125,33 +128,33 @@ public class WALPlayer extends Configured implements Tool {
ImmutableBytesWritable tableOut = new ImmutableBytesWritable(targetTable.getName());
Put put = null;
Delete del = null;
KeyValue lastKV = null;
for (KeyValue kv : value.getKeyValues()) {
Cell lastCell = null;
for (Cell cell : value.getCells()) {
// filtering HLog meta entries
if (WALEdit.isMetaEditFamily(kv.getFamily())) continue;
if (WALEdit.isMetaEditFamily(cell.getFamily())) continue;
// A WALEdit may contain multiple operations (HBASE-3584) and/or
// multiple rows (HBASE-5229).
// Aggregate as much as possible into a single Put/Delete
// operation before writing to the context.
if (lastKV == null || lastKV.getType() != kv.getType()
|| !CellUtil.matchingRow(lastKV, kv)) {
if (lastCell == null || lastCell.getTypeByte() != cell.getTypeByte()
|| !CellUtil.matchingRow(lastCell, cell)) {
// row or type changed, write out aggregate KVs.
if (put != null) context.write(tableOut, put);
if (del != null) context.write(tableOut, del);
if (kv.isDelete()) {
del = new Delete(kv.getRow());
if (CellUtil.isDelete(cell)) {
del = new Delete(cell.getRow());
} else {
put = new Put(kv.getRow());
put = new Put(cell.getRow());
}
}
if (kv.isDelete()) {
del.addDeleteMarker(kv);
if (CellUtil.isDelete(cell)) {
del.addDeleteMarker(cell);
} else {
put.add(kv);
put.add(cell);
}
lastKV = kv;
lastCell = cell;
}
// write residual KVs
if (put != null) context.write(tableOut, put);

View File

@ -28,12 +28,11 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
@ -43,6 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ServiceException;
@ -90,8 +90,8 @@ public class ReplicationProtbufUtil {
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final HLog.Entry[] entries, byte[] encodedRegionName) {
// Accumulate all the KVs seen in here.
List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length);
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
int size = 0;
WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
@ -135,19 +135,19 @@ public class ReplicationProtbufUtil {
keyBuilder.addScopes(scopeBuilder.build());
}
}
List<KeyValue> kvs = edit.getKeyValues();
List<Cell> cells = edit.getCells();
// Add up the size. It is used later serializing out the kvs.
for (KeyValue kv: kvs) {
size += kv.getLength();
for (Cell cell: cells) {
size += CellUtil.estimatedLengthOf(cell);
}
// Collect up the kvs
allkvs.add(kvs);
// Write out how many kvs associated with this entry.
entryBuilder.setAssociatedCellCount(kvs.size());
// Collect up the cells
allCells.add(cells);
// Write out how many cells associated with this entry.
entryBuilder.setAssociatedCellCount(cells.size());
builder.addEntry(entryBuilder.build());
}
return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
getCellScanner(allkvs, size));
getCellScanner(allCells, size));
}
/**

View File

@ -2714,8 +2714,8 @@ public class HRegion implements HeapSize { // , Writable{
// Add WAL edits by CP
WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
if (fromCP != null) {
for (KeyValue kv : fromCP.getKeyValues()) {
walEdit.add(kv);
for (Cell cell : fromCP.getCells()) {
walEdit.add(cell);
}
}
addFamilyMapToWALEdit(familyMaps[i], walEdit);
@ -3183,7 +3183,7 @@ public class HRegion implements HeapSize { // , Writable{
WALEdit walEdit) {
for (List<Cell> edits : familyMap.values()) {
for (Cell cell : edits) {
walEdit.add(KeyValueUtil.ensureKeyValue(cell));
walEdit.add(cell);
}
}
}
@ -3420,14 +3420,14 @@ public class HRegion implements HeapSize { // , Writable{
currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ?
key.getOrigLogSeqNum() : currentEditSeqId;
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY) ||
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {
//this is a special edit, we should handle it
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null) {
//replay the compaction
completeCompactionMarker(compaction);
@ -3437,13 +3437,13 @@ public class HRegion implements HeapSize { // , Writable{
continue;
}
// Figure which store the edit is meant for.
if (store == null || !CellUtil.matchingFamily(kv, store.getFamily().getName())) {
store = getStore(kv);
if (store == null || !CellUtil.matchingFamily(cell, store.getFamily().getName())) {
store = getStore(cell);
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
// crash and redeploy?
LOG.warn("No family for " + kv);
LOG.warn("No family for " + cell);
skippedEdits++;
continue;
}
@ -3453,11 +3453,11 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
}
kv.setSequenceId(currentReplaySeqId);
CellUtil.setSequenceId(cell, currentReplaySeqId);
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
flush = restoreEdit(store, kv);
flush = restoreEdit(store, cell);
editsCount++;
}
if (flush) internalFlushcache(null, currentEditSeqId, status);
@ -3525,11 +3525,11 @@ public class HRegion implements HeapSize { // , Writable{
/**
* Used by tests
* @param s Store to add edit too.
* @param kv KeyValue to add.
* @param cell Cell to add.
* @return True if we should flush.
*/
protected boolean restoreEdit(final Store s, final KeyValue kv) {
long kvSize = s.add(kv).getFirst();
protected boolean restoreEdit(final Store s, final Cell cell) {
long kvSize = s.add(cell).getFirst();
if (this.rsAccounting != null) {
rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize);
}

View File

@ -96,8 +96,7 @@ MultiRowMutationProcessorResponse> {
for (List<Cell> cells : m.getFamilyCellMap().values()) {
boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
for (Cell cell : cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (writeToWAL) walEdit.add(kv);
if (writeToWAL) walEdit.add(cell);
}
}
}
@ -146,8 +145,8 @@ MultiRowMutationProcessorResponse> {
// itself. No need to apply again to region
if (walEditsFromCP[i] != null) {
// Add the WALEdit created by CP hook
for (KeyValue walKv : walEditsFromCP[i].getKeyValues()) {
walEdit.add(walKv);
for (Cell walCell : walEditsFromCP[i].getCells()) {
walEdit.add(walCell);
}
}
}

View File

@ -57,11 +57,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -1463,7 +1465,7 @@ class FSHLog implements HLog, Syncable {
public long postAppend(final Entry e, final long elapsedTime) {
long len = 0;
if (this.metrics == null) return len;
for (KeyValue kv : e.getEdit().getKeyValues()) len += kv.getLength();
for (Cell cell : e.getEdit().getCells()) len += CellUtil.estimatedLengthOf(cell);
metrics.finishAppend(elapsedTime, len);
return len;
}

View File

@ -23,6 +23,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -37,8 +38,10 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -256,12 +259,10 @@ public class HLogPrettyPrinter {
continue;
// initialize list into which we will store atomic actions
List<Map> actions = new ArrayList<Map>();
for (KeyValue kv : edit.getKeyValues()) {
for (Cell cell : edit.getCells()) {
// add atomic operation to txn
Map<String, Object> op =
new HashMap<String, Object>(kv.toStringMap());
if (outputValues)
op.put("value", Bytes.toStringBinary(kv.getValue()));
Map<String, Object> op = new HashMap<String, Object>(toStringMap(cell));
if (outputValues) op.put("value", Bytes.toStringBinary(cell.getValue()));
// check row output filter
if (row == null || ((String) op.get("row")).equals(row))
actions.add(op);
@ -306,6 +307,31 @@ public class HLogPrettyPrinter {
}
}
private static Map<String, Object> toStringMap(Cell cell) {
Map<String, Object> stringMap = new HashMap<String, Object>();
stringMap.put("row",
Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
stringMap.put("family", Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength()));
stringMap.put("qualifier",
Bytes.toStringBinary(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength()));
stringMap.put("timestamp", cell.getTimestamp());
stringMap.put("vlen", cell.getValueLength());
if (cell.getTagsLength() > 0) {
List<String> tagsString = new ArrayList<String>();
Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
cell.getTagsLength());
while (tagsIterator.hasNext()) {
Tag tag = tagsIterator.next();
tagsString.add((tag.getType()) + ":"
+ Bytes.toStringBinary(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()));
}
stringMap.put("tag", tagsString);
}
return stringMap;
}
public static void main(String[] args) throws IOException {
run(args);
}

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
@ -1501,16 +1500,16 @@ public class HLogSplitter {
boolean needSkip = false;
HRegionLocation loc = null;
String locKey = null;
List<KeyValue> kvs = edit.getKeyValues();
List<KeyValue> skippedKVs = new ArrayList<KeyValue>();
List<Cell> cells = edit.getCells();
List<Cell> skippedCells = new ArrayList<Cell>();
HConnection hconn = this.getConnectionByTableName(table);
for (KeyValue kv : kvs) {
byte[] row = kv.getRow();
byte[] family = kv.getFamily();
for (Cell cell : cells) {
byte[] row = cell.getRow();
byte[] family = cell.getFamily();
boolean isCompactionEntry = false;
if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) {
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
CompactionDescriptor compaction = WALEdit.getCompaction(cell);
if (compaction != null && compaction.hasRegionName()) {
try {
byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
@ -1520,11 +1519,11 @@ public class HLogSplitter {
isCompactionEntry = true;
} catch (Exception ex) {
LOG.warn("Unexpected exception received, ignoring " + ex);
skippedKVs.add(kv);
skippedCells.add(cell);
continue;
}
} else {
skippedKVs.add(kv);
skippedCells.add(cell);
continue;
}
}
@ -1571,7 +1570,7 @@ public class HLogSplitter {
Long maxStoreSeqId = maxStoreSequenceIds.get(family);
if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
// skip current kv if column family doesn't exist anymore or already flushed
skippedKVs.add(kv);
skippedCells.add(cell);
continue;
}
}
@ -1581,8 +1580,8 @@ public class HLogSplitter {
// skip the edit
if (loc == null || needSkip) continue;
if (!skippedKVs.isEmpty()) {
kvs.removeAll(skippedKVs);
if (!skippedCells.isEmpty()) {
cells.removeAll(skippedCells);
}
synchronized (serverToBufferQueueMap) {
@ -2018,7 +2017,7 @@ public class HLogSplitter {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
if (val != null) val.add(KeyValueUtil.ensureKeyValue(cell));
if (val != null) val.add(cell);
boolean isNewRowOrType =
previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
@ -2040,13 +2039,13 @@ public class HLogSplitter {
}
}
if (CellUtil.isDelete(cell)) {
((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
((Delete) m).addDeleteMarker(cell);
} else {
Cell tmpNewCell = cell;
if (addLogReplayTag) {
tmpNewCell = tagReplayLogSequenceNumber(replaySeqId, cell);
}
((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell));
((Put) m).add(tmpNewCell);
}
if (m != null) {
m.setDurability(durability);

View File

@ -28,8 +28,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
@ -114,9 +114,9 @@ public class ProtobufLogWriter extends WriterBase {
entry.setCompressionContext(compressionContext);
entry.getKey().getBuilder(compressor).
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
for (KeyValue kv : entry.getEdit().getKeyValues()) {
for (Cell cell : entry.getEdit().getCells()) {
// cellEncoder must assume little about the stream, since we write PB and cells in turn.
cellEncoder.write(kv);
cellEncoder.write(cell);
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
@ -78,7 +79,8 @@ import org.apache.hadoop.io.Writable;
* is an old style KeyValue or the new style WALEdit.
*
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
HBaseInterfaceAudience.COPROC })
public class WALEdit implements Writable, HeapSize {
public static final Log LOG = LogFactory.getLog(WALEdit.class);
@ -92,7 +94,7 @@ public class WALEdit implements Writable, HeapSize {
private final int VERSION_2 = -1;
private final boolean isReplay;
private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(1);
private final ArrayList<Cell> cells = new ArrayList<Cell>(1);
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
@ -134,21 +136,21 @@ public class WALEdit implements Writable, HeapSize {
this.compressionContext = compressionContext;
}
public WALEdit add(KeyValue kv) {
this.kvs.add(kv);
public WALEdit add(Cell cell) {
this.cells.add(cell);
return this;
}
public boolean isEmpty() {
return kvs.isEmpty();
return cells.isEmpty();
}
public int size() {
return kvs.size();
return cells.size();
}
public ArrayList<KeyValue> getKeyValues() {
return kvs;
public ArrayList<Cell> getCells() {
return cells;
}
public NavigableMap<byte[], Integer> getAndRemoveScopes() {
@ -159,7 +161,7 @@ public class WALEdit implements Writable, HeapSize {
@Override
public void readFields(DataInput in) throws IOException {
kvs.clear();
cells.clear();
if (scopes != null) {
scopes.clear();
}
@ -197,9 +199,11 @@ public class WALEdit implements Writable, HeapSize {
public void write(DataOutput out) throws IOException {
LOG.warn("WALEdit is being serialized to writable - only expected in test code");
out.writeInt(VERSION_2);
out.writeInt(kvs.size());
out.writeInt(cells.size());
// We interleave the two lists for code simplicity
for (KeyValue kv : kvs) {
for (Cell cell : cells) {
// This is not used in any of the core code flows so it is just fine to convert to KV
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (compressionContext != null) {
KeyValueCompression.writeKV(out, kv, compressionContext);
} else{
@ -224,23 +228,19 @@ public class WALEdit implements Writable, HeapSize {
* @return Number of KVs read.
*/
public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
kvs.clear();
kvs.ensureCapacity(expectedCount);
while (kvs.size() < expectedCount && cellDecoder.advance()) {
Cell cell = cellDecoder.current();
if (!(cell instanceof KeyValue)) {
throw new IOException("WAL edit only supports KVs as cells");
}
kvs.add((KeyValue)cell);
cells.clear();
cells.ensureCapacity(expectedCount);
while (cells.size() < expectedCount && cellDecoder.advance()) {
cells.add(cellDecoder.current());
}
return kvs.size();
return cells.size();
}
@Override
public long heapSize() {
long ret = ClassSize.ARRAYLIST;
for (KeyValue kv : kvs) {
ret += kv.heapSize();
for (Cell cell : cells) {
ret += CellUtil.estimatedHeapSizeOf(cell);
}
if (scopes != null) {
ret += ClassSize.TREEMAP;
@ -254,9 +254,9 @@ public class WALEdit implements Writable, HeapSize {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("[#edits: " + kvs.size() + " = <");
for (KeyValue kv : kvs) {
sb.append(kv.toString());
sb.append("[#edits: " + cells.size() + " = <");
for (Cell cell : cells) {
sb.append(cell);
sb.append("; ");
}
if (scopes != null) {

View File

@ -29,11 +29,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
@ -230,10 +230,10 @@ public class WALEditsReplaySink {
boolean skip = false;
for (HLog.Entry entry : this.entries) {
WALEdit edit = entry.getEdit();
List<KeyValue> kvs = edit.getKeyValues();
for (KeyValue kv : kvs) {
List<Cell> cells = edit.getCells();
for (Cell cell : cells) {
// filtering HLog meta entries
setLocation(conn.locateRegion(tableName, kv.getRow()));
setLocation(conn.locateRegion(tableName, cell.getRow()));
skip = true;
break;
}

View File

@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.NavigableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
/**
@ -38,19 +38,19 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
if (scopes == null || scopes.isEmpty()) {
return null;
}
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
int size = kvs.size();
ArrayList<Cell> cells = entry.getEdit().getCells();
int size = cells.size();
for (int i = size - 1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
Cell cell = cells.get(i);
// The scope will be null or empty if
// there's nothing to replicate in that WALEdit
if (!scopes.containsKey(kv.getFamily())
|| scopes.get(kv.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
kvs.remove(i);
if (!scopes.containsKey(cell.getFamily())
|| scopes.get(cell.getFamily()) == HConstants.REPLICATION_SCOPE_LOCAL) {
cells.remove(i);
}
}
if (kvs.size() < size / 2) {
kvs.trimToSize();
if (cells.size() < size / 2) {
cells.trimToSize();
}
return entry;
}

View File

@ -21,9 +21,10 @@ package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.util.Bytes;
@ -39,7 +40,7 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
@Override
public Entry filter(Entry entry) {
String tabName = entry.getKey().getTablename().getNameAsString();
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
ArrayList<Cell> cells = entry.getEdit().getCells();
Map<String, List<String>> tableCFs = null;
try {
@ -48,7 +49,7 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
", degenerate as if it's not configured by keeping tableCFs==null");
}
int size = kvs.size();
int size = cells.size();
// return null(prevent replicating) if logKey's table isn't in this peer's
// replicable table list (empty tableCFs means all table are replicable)
@ -57,16 +58,16 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
} else {
List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
for (int i = size - 1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
Cell cell = cells.get(i);
// ignore(remove) kv if its cf isn't in the replicable cf list
// (empty cfs means all cfs of this table are replicable)
if ((cfs != null && !cfs.contains(Bytes.toString(kv.getFamily())))) {
kvs.remove(i);
if ((cfs != null && !cfs.contains(Bytes.toString(cell.getFamily())))) {
cells.remove(i);
}
}
}
if (kvs.size() < size/2) {
kvs.trimToSize();
if (cells.size() < size/2) {
cells.trimToSize();
}
return entry;
}

View File

@ -270,12 +270,12 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.getEntryBuffer();
if (entries.isEmpty() || entries.get(0).getEdit().getKeyValues().isEmpty()) {
if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
return;
}
sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
entries.get(0).getEdit().getKeyValues().get(0).getRow(), entries);
entries.get(0).getEdit().getCells().get(0).getRow(), entries);
}
@Override

View File

@ -37,12 +37,12 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
@ -245,10 +245,10 @@ public class Replication implements WALActionsListener,
NavigableMap<byte[], Integer> scopes =
new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
byte[] family;
for (KeyValue kv : logEdit.getKeyValues()) {
family = kv.getFamily();
for (Cell cell : logEdit.getCells()) {
family = cell.getFamily();
// This is expected and the KV should not be replicated
if (CellUtil.matchingFamily(kv, WALEdit.METAFAMILY)) continue;
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
// Unexpected, has a tendency to happen in unit tests
assert htd.getFamily(family) != null;

View File

@ -36,10 +36,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -634,16 +634,16 @@ public class ReplicationSource extends Thread
/**
* Count the number of different row keys in the given edit because of
* mini-batching. We assume that there's at least one KV in the WALEdit.
* mini-batching. We assume that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys
*/
private int countDistinctRowKeys(WALEdit edit) {
List<KeyValue> kvs = edit.getKeyValues();
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
KeyValue lastKV = kvs.get(0);
Cell lastCell = cells.get(0);
for (int i = 0; i < edit.size(); i++) {
if (!CellUtil.matchingRow(kvs.get(i), lastKV)) {
if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
distinctRowKeys++;
}
}

View File

@ -26,6 +26,7 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -93,28 +94,28 @@ implements WALObserver {
preWALWriteCalled = true;
// here we're going to remove one keyvalue from the WALEdit, and add
// another one to it.
List<KeyValue> kvs = logEdit.getKeyValues();
KeyValue deletedKV = null;
for (KeyValue kv : kvs) {
List<Cell> cells = logEdit.getCells();
Cell deletedCell = null;
for (Cell cell : cells) {
// assume only one kv from the WALEdit matches.
byte[] family = kv.getFamily();
byte[] qulifier = kv.getQualifier();
byte[] family = cell.getFamily();
byte[] qulifier = cell.getQualifier();
if (Arrays.equals(family, ignoredFamily) &&
Arrays.equals(qulifier, ignoredQualifier)) {
LOG.debug("Found the KeyValue from WALEdit which should be ignored.");
deletedKV = kv;
deletedCell = cell;
}
if (Arrays.equals(family, changedFamily) &&
Arrays.equals(qulifier, changedQualifier)) {
LOG.debug("Found the KeyValue from WALEdit which should be changed.");
kv.getValueArray()[kv.getValueOffset()] += 1;
cell.getValueArray()[cell.getValueOffset()] += 1;
}
}
kvs.add(new KeyValue(row, addedFamily, addedQualifier));
if (deletedKV != null) {
cells.add(new KeyValue(row, addedFamily, addedQualifier));
if (deletedCell != null) {
LOG.debug("About to delete a KeyValue from WALEdit.");
kvs.remove(deletedKV);
cells.remove(deletedCell);
}
return bypass;
}

View File

@ -19,30 +19,10 @@
package org.apache.hadoop.hbase.coprocessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@ -51,7 +31,40 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests invocation of the
@ -167,17 +180,17 @@ public class TestWALObserver {
boolean foundFamily2 = false;
boolean modifiedFamily1 = false;
List<KeyValue> kvs = edit.getKeyValues();
List<Cell> cells = edit.getCells();
for (KeyValue kv : kvs) {
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
for (Cell cell : cells) {
if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
foundFamily0 = true;
}
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
foundFamily2 = true;
}
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
modifiedFamily1 = true;
}
}
@ -194,15 +207,15 @@ public class TestWALObserver {
foundFamily0 = false;
foundFamily2 = false;
modifiedFamily1 = false;
for (KeyValue kv : kvs) {
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
for (Cell cell : cells) {
if (Arrays.equals(cell.getFamily(), TEST_FAMILY[0])) {
foundFamily0 = true;
}
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
if (Arrays.equals(cell.getFamily(), TEST_FAMILY[2])) {
foundFamily2 = true;
}
if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
if (Arrays.equals(cell.getFamily(), TEST_FAMILY[1])) {
if (!Arrays.equals(cell.getValue(), TEST_VALUE[1])) {
modifiedFamily1 = true;
}
}
@ -350,7 +363,7 @@ public class TestWALObserver {
for (List<Cell> edits : familyMap.values()) {
for (Cell cell : edits) {
// KeyValue v1 expectation. Cast for now until we go all Cell all the time. TODO.
walEdit.add((KeyValue)cell);
walEdit.add(cell);
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -234,10 +235,10 @@ public class TestHLogRecordReader {
for (byte[] column : columns) {
assertTrue(reader.nextKeyValue());
KeyValue kv = reader.getCurrentValue().getKeyValues().get(0);
if (!Bytes.equals(column, kv.getQualifier())) {
Cell cell = reader.getCurrentValue().getCells().get(0);
if (!Bytes.equals(column, cell.getQualifier())) {
assertTrue("expected [" + Bytes.toString(column) + "], actual ["
+ Bytes.toString(kv.getQualifier()) + "]", false);
+ Bytes.toString(cell.getQualifier()) + "]", false);
}
}
assertFalse(reader.nextKeyValue());

View File

@ -20,21 +20,25 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
@ -56,9 +60,6 @@ import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
* Basic test for the WALPlayer M/R tool
*/
@ -143,12 +144,12 @@ public class TestWALPlayer {
when(context.getConfiguration()).thenReturn(configuration);
WALEdit value = mock(WALEdit.class);
ArrayList<KeyValue> values = new ArrayList<KeyValue>();
ArrayList<Cell> values = new ArrayList<Cell>();
KeyValue kv1 = mock(KeyValue.class);
when(kv1.getFamily()).thenReturn(Bytes.toBytes("family"));
when(kv1.getRow()).thenReturn(Bytes.toBytes("row"));
values.add(kv1);
when(value.getKeyValues()).thenReturn(values);
when(value.getCells()).thenReturn(values);
mapper.setup(context);
doAnswer(new Answer<Void>() {

View File

@ -1509,7 +1509,7 @@ public class TestDistributedLogSplitting {
HLog.Reader in = HLogFactory.createReader(fs, log, conf);
HLog.Entry e;
while ((e = in.next()) != null) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getKeyValues().get(0))) {
if (!WALEdit.isMetaEditFamily(e.getEdit().getCells().get(0))) {
count++;
}
}

View File

@ -846,7 +846,7 @@ public class TestHRegion {
if (entry == null) {
break;
}
Cell cell = entry.getEdit().getKeyValues().get(0);
Cell cell = entry.getEdit().getCells().get(0);
if (WALEdit.isMetaEditFamily(cell)) {
FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
assertNotNull(flushDesc);
@ -912,14 +912,14 @@ public class TestHRegion {
}
@Override
public boolean matches(Object edit) {
List<KeyValue> kvs = ((WALEdit)edit).getKeyValues();
if (kvs.isEmpty()) {
List<Cell> cells = ((WALEdit)edit).getCells();
if (cells.isEmpty()) {
return false;
}
if (WALEdit.isMetaEditFamily(kvs.get(0))) {
if (WALEdit.isMetaEditFamily(cells.get(0))) {
FlushDescriptor desc = null;
try {
desc = WALEdit.getFlushDescriptor(kvs.get(0));
desc = WALEdit.getFlushDescriptor(cells.get(0));
} catch (IOException e) {
LOG.warn(e);
return false;
@ -5527,9 +5527,9 @@ public class TestHRegion {
WALEdit edit = editCaptor.getValue();
assertNotNull(edit);
assertNotNull(edit.getKeyValues());
assertEquals(1, edit.getKeyValues().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getKeyValues().get(0));
assertNotNull(edit.getCells());
assertEquals(1, edit.getCells().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
assertNotNull(desc);
LOG.info("RegionEventDescriptor from WAL: " + desc);
@ -5591,9 +5591,9 @@ public class TestHRegion {
WALEdit edit = editCaptor.getAllValues().get(1);
assertNotNull(edit);
assertNotNull(edit.getKeyValues());
assertEquals(1, edit.getKeyValues().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getKeyValues().get(0));
assertNotNull(edit.getCells());
assertEquals(1, edit.getCells().size());
RegionEventDescriptor desc = WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
assertNotNull(desc);
LOG.info("RegionEventDescriptor from WAL: " + desc);

View File

@ -42,8 +42,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting;
@ -54,15 +52,15 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.htrace.impl.ProbabilitySampler;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.reporting.ConsoleReporter;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.htrace.impl.ProbabilitySampler;
/**
* This class runs performance benchmarks for {@link HLog}.
@ -534,8 +532,7 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
WALEdit walEdit) {
for (List<Cell> edits : familyMap.values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
walEdit.add(kv);
walEdit.add(cell);
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -56,7 +57,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@ -216,7 +216,7 @@ public class TestHLog {
for (Put p: puts) {
CellScanner cs = p.cellScanner();
while (cs.advance()) {
edits.add(KeyValueUtil.ensureKeyValue(cs.current()));
edits.add(cs.current());
}
}
// Add any old cluster id.
@ -567,7 +567,7 @@ public class TestHLog {
while (reader.next(entry) != null) {
count++;
assertTrue("Should be one KeyValue per WALEdit",
entry.getEdit().getKeyValues().size() == 1);
entry.getEdit().getCells().size() == 1);
}
assertEquals(total, count);
reader.close();
@ -623,9 +623,9 @@ public class TestHLog {
WALEdit val = entry.getEdit();
assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
assertTrue(tableName.equals(key.getTablename()));
KeyValue kv = val.getKeyValues().get(0);
assertTrue(Bytes.equals(row, kv.getRow()));
assertEquals((byte)(i + '0'), kv.getValue()[0]);
Cell cell = val.getCells().get(0);
assertTrue(Bytes.equals(row, cell.getRow()));
assertEquals((byte)(i + '0'), cell.getValue()[0]);
System.out.println(key + " " + val);
}
} finally {
@ -675,7 +675,7 @@ public class TestHLog {
HLog.Entry entry = reader.next();
assertEquals(COL_COUNT, entry.getEdit().size());
int idx = 0;
for (KeyValue val : entry.getEdit().getKeyValues()) {
for (Cell val : entry.getEdit().getCells()) {
assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
entry.getKey().getEncodedRegionName()));
assertTrue(tableName.equals(entry.getKey().getTablename()));
@ -917,7 +917,7 @@ public class TestHLog {
assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
assertEquals(tableName, entry.getKey().getTablename());
int idx = 0;
for (KeyValue val : entry.getEdit().getKeyValues()) {
for (Cell val : entry.getEdit().getCells()) {
assertTrue(Bytes.equals(row, val.getRow()));
String value = i + "" + idx;
assertArrayEquals(Bytes.toBytes(value), val.getValue());
@ -1007,7 +1007,7 @@ public class TestHLog {
assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName());
assertEquals(tableName, entry.getKey().getTablename());
int idx = 0;
for (KeyValue val : entry.getEdit().getKeyValues()) {
for (Cell val : entry.getEdit().getCells()) {
assertTrue(Bytes.equals(row, val.getRow()));
String value = i + "" + idx;
assertArrayEquals(Bytes.toBytes(value), val.getValue());

View File

@ -42,11 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.hbase.TableName;
import org.apache.log4j.Level;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -54,14 +49,14 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@ -74,14 +69,17 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
@ -1021,12 +1019,12 @@ public class TestHLogSplit {
}
HLog.Entry entry = (Entry) invocation.getArguments()[0];
WALEdit edit = entry.getEdit();
List<KeyValue> keyValues = edit.getKeyValues();
assertEquals(1, keyValues.size());
KeyValue kv = keyValues.get(0);
List<Cell> cells = edit.getCells();
assertEquals(1, cells.size());
Cell cell = cells.get(0);
// Check that the edits come in the right order.
assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
assertEquals(expectedIndex, Bytes.toInt(cell.getRow()));
expectedIndex++;
return null;
}

View File

@ -38,12 +38,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
@ -539,9 +539,9 @@ public class TestLogRolling {
TEST_UTIL.getConfiguration());
HLog.Entry entry;
while ((entry = reader.next()) != null) {
LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
for (KeyValue kv : entry.getEdit().getKeyValues()) {
loggedRows.add(Bytes.toStringBinary(kv.getRow()));
LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getCells());
for (Cell cell : entry.getEdit().getCells()) {
loggedRows.add(Bytes.toStringBinary(cell.getRow()));
}
}
} catch (EOFException e) {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -110,14 +111,14 @@ public class TestSecureHLog {
HLog.Entry entry = new HLog.Entry();
while (reader.next(entry) != null) {
count++;
List<KeyValue> kvs = entry.getEdit().getKeyValues();
assertTrue("Should be one KV per WALEdit", kvs.size() == 1);
for (KeyValue kv: kvs) {
byte[] thisRow = kv.getRow();
List<Cell> cells = entry.getEdit().getCells();
assertTrue("Should be one KV per WALEdit", cells.size() == 1);
for (Cell cell: cells) {
byte[] thisRow = cell.getRow();
assertTrue("Incorrect row", Bytes.equals(thisRow, row));
byte[] thisFamily = kv.getFamily();
byte[] thisFamily = cell.getFamily();
assertTrue("Incorrect family", Bytes.equals(thisFamily, family));
byte[] thisValue = kv.getValue();
byte[] thisValue = cell.getValue();
assertTrue("Incorrect value", Bytes.equals(thisValue, value));
}
}

View File

@ -530,8 +530,8 @@ public class TestWALReplay {
final AtomicInteger countOfRestoredEdits = new AtomicInteger(0);
HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) {
@Override
protected boolean restoreEdit(Store s, KeyValue kv) {
boolean b = super.restoreEdit(s, kv);
protected boolean restoreEdit(Store s, Cell cell) {
boolean b = super.restoreEdit(s, cell);
countOfRestoredEdits.incrementAndGet();
return b;
}

View File

@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HTable;
@ -173,10 +173,10 @@ public class TestReplicationEndpoint extends TestReplicationBase {
return; // first call
}
Assert.assertEquals(1, ReplicationEndpointForTest.lastEntries.size());
List<KeyValue> kvs = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getKeyValues();
Assert.assertEquals(1, kvs.size());
Assert.assertTrue(Bytes.equals(kvs.get(0).getRowArray(), kvs.get(0).getRowOffset(),
kvs.get(0).getRowLength(), row, 0, row.length));
List<Cell> cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
Assert.assertEquals(1, cells.size());
Assert.assertTrue(Bytes.equals(cells.get(0).getRowArray(), cells.get(0).getRowOffset(),
cells.get(0).getRowLength(), row, 0, row.length));
}
public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
@ -255,13 +255,13 @@ public class TestReplicationEndpoint extends TestReplicationBase {
return new ChainWALEntryFilter(super.getWALEntryfilter(), new WALEntryFilter() {
@Override
public Entry filter(Entry entry) {
ArrayList<KeyValue> kvs = entry.getEdit().getKeyValues();
int size = kvs.size();
ArrayList<Cell> cells = entry.getEdit().getCells();
int size = cells.size();
for (int i = size-1; i >= 0; i--) {
KeyValue kv = kvs.get(i);
if (!Bytes.equals(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
Cell cell = cells.get(i);
if (!Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
row, 0, row.length)) {
kvs.remove(i);
cells.remove(i);
}
}
return entry;

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -265,11 +266,11 @@ public class TestReplicationWALEntryFilters {
if (e1.getEdit() == null) {
return;
}
List<KeyValue> kvs1 = e1.getEdit().getKeyValues();
List<KeyValue> kvs2 = e2.getEdit().getKeyValues();
Assert.assertEquals(kvs1.size(), kvs2.size());
for (int i = 0; i < kvs1.size(); i++) {
KeyValue.COMPARATOR.compare(kvs1.get(i), kvs2.get(i));
List<Cell> cells1 = e1.getEdit().getCells();
List<Cell> cells2 = e2.getEdit().getCells();
Assert.assertEquals(cells1.size(), cells2.size());
for (int i = 0; i < cells1.size(); i++) {
KeyValue.COMPARATOR.compare(cells1.get(i), cells2.get(i));
}
}

View File

@ -183,7 +183,7 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
throws IOException, RuntimeException {
HLog.Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = entry.getEdit().getKeyValues().get(0).getRow();
byte[] row = entry.getEdit().getCells().get(0).getRow();
RegionLocations locations = connection.locateRegion(tableName, row, true, true);
RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
RpcControllerFactory.instantiate(connection.getConfiguration()),