HBASE-11777 Find a way to set sequenceId on Cells on the server.

This commit is contained in:
anoopsjohn 2014-09-01 15:10:03 +05:30
parent 844f3dfb6a
commit 58b5bce172
16 changed files with 141 additions and 89 deletions

View File

@ -515,4 +515,19 @@ public final class CellUtil {
&& (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2, && (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2,
end1) < 0); end1) < 0);
} }
/**
* Sets the given seqId to the cell.
* @param cell
* @param seqId
* @throws IOException when the passed cell is not of type {@link SettableSequenceId}
*/
public static void setSequenceId(Cell cell, long seqId) throws IOException {
if (cell instanceof SettableSequenceId) {
((SettableSequenceId) cell).setSequenceId(seqId);
} else {
throw new IOException(new UnsupportedOperationException("Cell is not of type "
+ SettableSequenceId.class.getName()));
}
}
} }

View File

@ -78,7 +78,7 @@ import com.google.common.annotations.VisibleForTesting;
* and actual tag bytes length. * and actual tag bytes length.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class KeyValue implements Cell, HeapSize, Cloneable { public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId {
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>(); private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
static final Log LOG = LogFactory.getLog(KeyValue.class); static final Log LOG = LogFactory.getLog(KeyValue.class);
@ -1387,15 +1387,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0); return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
} }
/**
* @return Column (family + qualifier) length
*/
private int getTotalColumnLength(int rlength, int foffset) {
int flength = getFamilyLength(foffset);
int qlength = getQualifierLength(rlength,flength);
return flength + qlength;
}
/** /**
* @return Timestamp offset * @return Timestamp offset
*/ */

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Using this Interface one can mark a Cell as Sequence stampable. <br>
* Note : Make sure to make Cell implementation of this type in server side.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
public interface SettableSequenceId {
/**
* Sets with the given seqId.
* @param seqId
*/
void setSequenceId(long seqId);
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SettableSequenceId;
import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator; import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValue.Type;
@ -320,7 +321,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
* Note that the value byte[] part is still pointing to the currentBuffer and the * Note that the value byte[] part is still pointing to the currentBuffer and the
* represented by the valueOffset and valueLength * represented by the valueOffset and valueLength
*/ */
protected static class ClonedSeekerState implements Cell { // We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
// there. So this has to be an instance of SettableSequenceId. SeekerState need not be
// SettableSequenceId as we never return that to top layers. When we have to, we make
// ClonedSeekerState from it.
protected static class ClonedSeekerState implements Cell, SettableSequenceId {
private byte[] keyOnlyBuffer; private byte[] keyOnlyBuffer;
private ByteBuffer currentBuffer; private ByteBuffer currentBuffer;
private short rowLength; private short rowLength;
@ -335,12 +340,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
private int tagsLength; private int tagsLength;
private int tagsOffset; private int tagsOffset;
private byte[] cloneTagsBuffer; private byte[] cloneTagsBuffer;
private long memstoreTS; private long seqId;
private TagCompressionContext tagCompressionContext; private TagCompressionContext tagCompressionContext;
protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength, protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength, int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
long timeStamp, byte typeByte, int valueLen, int valueOffset, long memStoreTS, long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext, int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
byte[] tagsBuffer) { byte[] tagsBuffer) {
this.currentBuffer = currentBuffer; this.currentBuffer = currentBuffer;
@ -355,7 +360,6 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
this.typeByte = typeByte; this.typeByte = typeByte;
this.valueLength = valueLen; this.valueLength = valueLen;
this.valueOffset = valueOffset; this.valueOffset = valueOffset;
this.memstoreTS = memStoreTS;
this.tagsOffset = tagsOffset; this.tagsOffset = tagsOffset;
this.tagsLength = tagsLength; this.tagsLength = tagsLength;
System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength); System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
@ -363,6 +367,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
this.cloneTagsBuffer = new byte[tagsLength]; this.cloneTagsBuffer = new byte[tagsLength];
System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength); System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
} }
setSequenceId(seqId);
} }
@Override @Override
@ -421,13 +426,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
} }
@Override @Override
@Deprecated
public long getMvccVersion() { public long getMvccVersion() {
return memstoreTS; return getSequenceId();
} }
@Override @Override
public long getSequenceId() { public long getSequenceId() {
return memstoreTS; return seqId;
} }
@Override @Override
@ -498,6 +504,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
} }
return kv.toString(); return kv.toString();
} }
@Override
public void setSequenceId(long seqId) {
this.seqId = seqId;
}
} }
protected abstract static class protected abstract static class

View File

@ -2000,7 +2000,7 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
private long getNextSequenceId(final HLog wal) throws IOException { private long getNextSequenceId(final HLog wal) throws IOException {
HLogKey key = this.appendNoSyncNoAppend(wal, null); HLogKey key = this.appendNoSyncNoAppend(wal, null);
return key.getSequenceNumber(); return key.getSequenceId();
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -2507,7 +2507,7 @@ public class HRegion implements HeapSize { // , Writable{
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired // reference family maps directly so coprocessors can mutate them if desired
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length]; Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); List<Cell> memstoreCells = new ArrayList<Cell>();
// We try to set up a batch in the range [firstIndex,lastIndexExclusive) // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess; int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex; int lastIndexExclusive = firstIndex;
@ -3087,9 +3087,10 @@ public class HRegion implements HeapSize { // , Writable{
* @param output newly added KVs into memstore * @param output newly added KVs into memstore
* @return the additional memory usage of the memstore caused by the * @return the additional memory usage of the memstore caused by the
* new entries. * new entries.
* @throws IOException
*/ */
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap, private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
long mvccNum, List<KeyValue> memstoreCells) { long mvccNum, List<Cell> memstoreCells) throws IOException {
long size = 0; long size = 0;
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) { for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@ -3098,9 +3099,8 @@ public class HRegion implements HeapSize { // , Writable{
Store store = getStore(family); Store store = getStore(family);
for (Cell cell: cells) { for (Cell cell: cells) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell); CellUtil.setSequenceId(cell, mvccNum);
kv.setSequenceId(mvccNum); Pair<Long, Cell> ret = store.add(cell);
Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst(); size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
} }
@ -3114,13 +3114,13 @@ public class HRegion implements HeapSize { // , Writable{
* called when a Put/Delete has updated memstore but subsequently fails to update * called when a Put/Delete has updated memstore but subsequently fails to update
* the wal. This method is then invoked to rollback the memstore. * the wal. This method is then invoked to rollback the memstore.
*/ */
private void rollbackMemstore(List<KeyValue> memstoreCells) { private void rollbackMemstore(List<Cell> memstoreCells) {
int kvsRolledback = 0; int kvsRolledback = 0;
for (KeyValue kv : memstoreCells) { for (Cell cell : memstoreCells) {
byte[] family = kv.getFamily(); byte[] family = cell.getFamily();
Store store = getStore(family); Store store = getStore(family);
store.rollback(kv); store.rollback(cell);
kvsRolledback++; kvsRolledback++;
} }
LOG.debug("rollbackMemstore rolled back " + kvsRolledback); LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
@ -5049,7 +5049,7 @@ public class HRegion implements HeapSize { // , Writable{
List<RowLock> acquiredRowLocks; List<RowLock> acquiredRowLocks;
long addedSize = 0; long addedSize = 0;
List<Mutation> mutations = new ArrayList<Mutation>(); List<Mutation> mutations = new ArrayList<Mutation>();
List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); List<Cell> memstoreCells = new ArrayList<Cell>();
Collection<byte[]> rowsToLock = processor.getRowsToLock(); Collection<byte[]> rowsToLock = processor.getRowsToLock();
long mvccNum = 0; long mvccNum = 0;
HLogKey walKey = null; HLogKey walKey = null;
@ -5081,16 +5081,16 @@ public class HRegion implements HeapSize { // , Writable{
// 7. Apply to memstore // 7. Apply to memstore
for (Mutation m : mutations) { for (Mutation m : mutations) {
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current()); Cell cell = cellScanner.current();
kv.setSequenceId(mvccNum); CellUtil.setSequenceId(cell, mvccNum);
Store store = getStore(kv); Store store = getStore(cell);
if (store == null) { if (store == null) {
checkFamily(CellUtil.cloneFamily(kv)); checkFamily(CellUtil.cloneFamily(cell));
// unreachable // unreachable
} }
Pair<Long, Cell> ret = store.add(kv); Pair<Long, Cell> ret = store.add(cell);
addedSize += ret.getFirst(); addedSize += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); memstoreCells.add(ret.getSecond());
} }
} }
@ -5248,7 +5248,7 @@ public class HRegion implements HeapSize { // , Writable{
WriteEntry w = null; WriteEntry w = null;
HLogKey walKey = null; HLogKey walKey = null;
RowLock rowLock = null; RowLock rowLock = null;
List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); List<Cell> memstoreCells = new ArrayList<Cell>();
boolean doRollBackMemstore = false; boolean doRollBackMemstore = false;
try { try {
rowLock = getRowLock(row); rowLock = getRowLock(row);
@ -5361,14 +5361,14 @@ public class HRegion implements HeapSize { // , Writable{
if (store.getFamily().getMaxVersions() == 1) { if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1 // upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint()); size += store.upsert(entry.getValue(), getSmallestReadPoint());
memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue())); memstoreCells.addAll(entry.getValue());
} else { } else {
// otherwise keep older versions around // otherwise keep older versions around
for (Cell cell: entry.getValue()) { for (Cell cell: entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
Pair<Long, Cell> ret = store.add(kv); Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst(); size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); memstoreCells.add(ret.getSecond());
doRollBackMemstore = true; doRollBackMemstore = true;
} }
} }
@ -5387,7 +5387,7 @@ public class HRegion implements HeapSize { // , Writable{
} else { } else {
recordMutationWithoutWal(append.getFamilyCellMap()); recordMutationWithoutWal(append.getFamilyCellMap());
} }
if(walKey == null){ if (walKey == null) {
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
} }
@ -5466,7 +5466,7 @@ public class HRegion implements HeapSize { // , Writable{
WriteEntry w = null; WriteEntry w = null;
HLogKey walKey = null; HLogKey walKey = null;
long mvccNum = 0; long mvccNum = 0;
List<KeyValue> memstoreCells = new ArrayList<KeyValue>(); List<Cell> memstoreCells = new ArrayList<Cell>();
boolean doRollBackMemstore = false; boolean doRollBackMemstore = false;
try { try {
rowLock = getRowLock(row); rowLock = getRowLock(row);
@ -5582,14 +5582,14 @@ public class HRegion implements HeapSize { // , Writable{
if (store.getFamily().getMaxVersions() == 1) { if (store.getFamily().getMaxVersions() == 1) {
// upsert if VERSIONS for this CF == 1 // upsert if VERSIONS for this CF == 1
size += store.upsert(entry.getValue(), getSmallestReadPoint()); size += store.upsert(entry.getValue(), getSmallestReadPoint());
memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue())); memstoreCells.addAll(entry.getValue());
} else { } else {
// otherwise keep older versions around // otherwise keep older versions around
for (Cell cell : entry.getValue()) { for (Cell cell : entry.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell); KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
Pair<Long, Cell> ret = store.add(kv); Pair<Long, Cell> ret = store.add(kv);
size += ret.getFirst(); size += ret.getFirst();
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); memstoreCells.add(ret.getSecond());
doRollBackMemstore = true; doRollBackMemstore = true;
} }
} }
@ -6373,12 +6373,12 @@ public class HRegion implements HeapSize { // , Writable{
* Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore * Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
* the WALEdit append later. * the WALEdit append later.
* @param wal * @param wal
* @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to * @param cells list of Cells inserted into memstore. Those Cells are passed in order to
* be updated with right mvcc values(their log sequence nu * be updated with right mvcc values(their log sequence number)
* @return Return the key used appending with no sync and no append. * @return Return the key used appending with no sync and no append.
* @throws IOException * @throws IOException
*/ */
private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException { private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException {
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(), HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE); HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated // Call append but with an empty WALEdit. The returned seqeunce id will not be associated

View File

@ -631,10 +631,10 @@ public class HStore implements Store {
} }
@Override @Override
public Pair<Long, Cell> add(final KeyValue kv) { public Pair<Long, Cell> add(final Cell cell) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
return this.memstore.add(kv); return this.memstore.add(cell);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -661,10 +661,10 @@ public class HStore implements Store {
} }
@Override @Override
public void rollback(final KeyValue kv) { public void rollback(final Cell cell) {
lock.readLock().lock(); lock.readLock().lock();
try { try {
this.memstore.rollback(kv); this.memstore.rollback(cell);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }

View File

@ -114,11 +114,11 @@ public class MultiVersionConsistencyControl {
* visible to MVCC readers. * visible to MVCC readers.
* @throws IOException * @throws IOException
*/ */
public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum) public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
throws IOException { throws IOException {
if(e == null) return; if(e == null) return;
if (seqNum != null) { if (seqId != null) {
e.setWriteNumber(seqNum.getSequenceNumber()); e.setWriteNumber(seqId.getSequenceId());
} else { } else {
// set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
// function beginMemstoreInsertWithSeqNum in case of failures // function beginMemstoreInsertWithSeqNum in case of failures

View File

@ -23,9 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
/** /**
* Interface which abstracts implementations on log sequence number assignment * Interface which abstracts implementations on log sequenceId assignment
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface SequenceNumber { public interface SequenceId {
public long getSequenceNumber() throws IOException; public long getSequenceId() throws IOException;
} }

View File

@ -122,10 +122,10 @@ public interface Store extends HeapSize, StoreConfigInformation {
/** /**
* Adds a value to the memstore * Adds a value to the memstore
* @param kv * @param cell
* @return memstore size delta & newly added KV which maybe different than the passed in KV * @return memstore size delta & newly added KV which maybe different than the passed in KV
*/ */
Pair<Long, Cell> add(KeyValue kv); Pair<Long, Cell> add(Cell cell);
/** /**
* When was the last edit done in the memstore * When was the last edit done in the memstore
@ -133,11 +133,11 @@ public interface Store extends HeapSize, StoreConfigInformation {
long timeOfOldestEdit(); long timeOfOldestEdit();
/** /**
* Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the * Removes a Cell from the memstore. The Cell is removed only if its key & memstoreTS match the
* key & memstoreTS value of the kv parameter. * key & memstoreTS value of the cell parameter.
* @param kv * @param cell
*/ */
void rollback(final KeyValue kv); void rollback(final Cell cell);
/** /**
* Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. WARNING: * Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. WARNING:

View File

@ -198,12 +198,10 @@ public class StoreFileScanner implements KeyValueScanner {
} }
} }
protected void setCurrentCell(Cell newVal) { protected void setCurrentCell(Cell newVal) throws IOException {
this.cur = newVal; this.cur = newVal;
if(this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) { if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
KeyValue curKV = KeyValueUtil.ensureKeyValue(cur); CellUtil.setSequenceId(cur, this.reader.getSequenceID());
curKV.setSequenceId(this.reader.getSequenceID());
cur = curKV;
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
@ -241,7 +242,7 @@ public abstract class Compactor {
for (Cell c : kvs) { for (Cell c : kvs) {
KeyValue kv = KeyValueUtil.ensureKeyValue(c); KeyValue kv = KeyValueUtil.ensureKeyValue(c);
if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) { if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
kv.setSequenceId(0); CellUtil.setSequenceId(kv, 0);
} }
writer.append(kv); writer.append(kv);
++progress.currentCompactedKVs; ++progress.currentCompactedKVs;

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
@ -1107,9 +1108,9 @@ class FSHLog implements HLog, Syncable {
@Override @Override
public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key, public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore, final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
final List<KeyValue> memstoreKVs) final List<Cell> memstoreCells)
throws IOException { throws IOException {
return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs); return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreCells);
} }
/** /**
@ -1124,7 +1125,7 @@ class FSHLog implements HLog, Syncable {
* @param sync shall we sync after we call the append? * @param sync shall we sync after we call the append?
* @param inMemstore * @param inMemstore
* @param sequenceId The region sequence id reference. * @param sequenceId The region sequence id reference.
* @param memstoreKVs * @param memstoreCells
* @return txid of this transaction or if nothing to do, the last txid * @return txid of this transaction or if nothing to do, the last txid
* @throws IOException * @throws IOException
*/ */
@ -1132,7 +1133,7 @@ class FSHLog implements HLog, Syncable {
justification="Will never be null") justification="Will never be null")
private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key, private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore, WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore,
List<KeyValue> memstoreKVs) List<Cell> memstoreCells)
throws IOException { throws IOException {
if (!this.enabled) return this.highestUnsyncedSequence; if (!this.enabled) return this.highestUnsyncedSequence;
if (this.closed) throw new IOException("Cannot append; log is closed"); if (this.closed) throw new IOException("Cannot append; log is closed");
@ -1150,7 +1151,7 @@ class FSHLog implements HLog, Syncable {
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the // Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
// edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the // edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
// latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append. // latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs); entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
truck.loadPayload(entry, scope.detach()); truck.loadPayload(entry, scope.detach());
} finally { } finally {
this.disruptor.getRingBuffer().publish(sequence); this.disruptor.getRingBuffer().publish(sequence);

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
/** /**
* A WAL Entry for {@link FSHLog} implementation. Immutable. * A WAL Entry for {@link FSHLog} implementation. Immutable.
@ -43,18 +44,18 @@ class FSWALEntry extends HLog.Entry {
private final transient boolean inMemstore; private final transient boolean inMemstore;
private final transient HTableDescriptor htd; private final transient HTableDescriptor htd;
private final transient HRegionInfo hri; private final transient HRegionInfo hri;
private final transient List<KeyValue> memstoreKVs; private final transient List<Cell> memstoreCells;
FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit, FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
final HTableDescriptor htd, final HRegionInfo hri, List<KeyValue> memstoreKVs) { final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
super(key, edit); super(key, edit);
this.regionSequenceIdReference = referenceToRegionSequenceId; this.regionSequenceIdReference = referenceToRegionSequenceId;
this.inMemstore = inMemstore; this.inMemstore = inMemstore;
this.htd = htd; this.htd = htd;
this.hri = hri; this.hri = hri;
this.sequence = sequence; this.sequence = sequence;
this.memstoreKVs = memstoreKVs; this.memstoreCells = memstoreCells;
} }
public String toString() { public String toString() {
@ -87,13 +88,14 @@ class FSWALEntry extends HLog.Entry {
* WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this * WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
* method to be called. * method to be called.
* @return The region edit/sequence id we set for this edit. * @return The region edit/sequence id we set for this edit.
* @throws IOException
* @see #getRegionSequenceId() * @see #getRegionSequenceId()
*/ */
long stampRegionSequenceId() { long stampRegionSequenceId() throws IOException {
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) { if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) {
for(KeyValue kv : this.memstoreKVs){ for (Cell cell : this.memstoreCells) {
kv.setSequenceId(regionSequenceId); CellUtil.setSequenceId(cell, regionSequenceId);
} }
} }
HLogKey key = getKey(); HLogKey key = getKey();

View File

@ -34,10 +34,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
@ -366,14 +366,13 @@ public interface HLog {
* @param inMemstore Always true except for case where we are writing a compaction completion * @param inMemstore Always true except for case where we are writing a compaction completion
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction * record into the WAL; in this case the entry is just so we can finish an unfinished compaction
* -- it is not an edit for memstore. * -- it is not an edit for memstore.
* @param memstoreKVs list of KVs added into memstore * @param memstoreCells list of Cells added into memstore
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id * @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it. * in it.
* @throws IOException * @throws IOException
*/ */
long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits, long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
AtomicLong sequenceId, boolean inMemstore, List<KeyValue> memstoreKVs) AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreCells) throws IOException;
throws IOException;
// TODO: Do we need all these versions of sync? // TODO: Do we need all these versions of sync?
void hsync() throws IOException; void hsync() throws IOException;

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.regionserver.SequenceNumber; import org.apache.hadoop.hbase.regionserver.SequenceId;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparable;
@ -69,7 +69,7 @@ import com.google.protobuf.ByteString;
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
// purposes. They need to be merged into HLogEntry. // purposes. They need to be merged into HLogEntry.
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber { public class HLogKey implements WritableComparable<HLogKey>, SequenceId {
public static final Log LOG = LogFactory.getLog(HLogKey.class); public static final Log LOG = LogFactory.getLog(HLogKey.class);
// should be < 0 (@see #readFields(DataInput)) // should be < 0 (@see #readFields(DataInput))
@ -278,7 +278,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
* @throws InterruptedException * @throws InterruptedException
*/ */
@Override @Override
public long getSequenceNumber() throws IOException { public long getSequenceId() throws IOException {
try { try {
this.seqNumAssignedLatch.await(); this.seqNumAssignedLatch.await();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {

View File

@ -969,7 +969,7 @@ public class TestHRegion {
// throw exceptions if the WalEdit is a start flush action // throw exceptions if the WalEdit is a start flush action
when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
(WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(), (WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(),
(List<KeyValue>)any())) (List<Cell>)any()))
.thenThrow(new IOException("Fail to append flush marker")); .thenThrow(new IOException("Fail to append flush marker"));
// start cache flush will throw exception // start cache flush will throw exception
@ -4498,7 +4498,7 @@ public class TestHRegion {
//verify append called or not //verify append called or not
verify(log, expectAppend ? times(1) : never()) verify(log, expectAppend ? times(1) : never())
.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), .appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
(WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<KeyValue>)any()); (WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<Cell>)any());
// verify sync called or not // verify sync called or not
if (expectSync || expectSyncFromLogSyncer) { if (expectSync || expectSyncFromLogSyncer) {
@ -5523,7 +5523,7 @@ public class TestHRegion {
TEST_UTIL.getConfiguration(), rss, null); TEST_UTIL.getConfiguration(), rss, null);
verify(log, times(1)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any() verify(log, times(1)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any()
, editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any()); , editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
WALEdit edit = editCaptor.getValue(); WALEdit edit = editCaptor.getValue();
assertNotNull(edit); assertNotNull(edit);
@ -5587,7 +5587,7 @@ public class TestHRegion {
// 2 times, one for region open, the other close region // 2 times, one for region open, the other close region
verify(log, times(2)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(), verify(log, times(2)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any()); editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
WALEdit edit = editCaptor.getAllValues().get(1); WALEdit edit = editCaptor.getAllValues().get(1);
assertNotNull(edit); assertNotNull(edit);