HBASE-11777 Find a way to set sequenceId on Cells on the server.
This commit is contained in:
parent
478bdc8ea5
commit
125f5619b9
|
@ -515,4 +515,19 @@ public final class CellUtil {
|
|||
&& (end1.length == 0 || start2.length == 0 || Bytes.compareTo(start2,
|
||||
end1) < 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the given seqId to the cell.
|
||||
* @param cell
|
||||
* @param seqId
|
||||
* @throws IOException when the passed cell is not of type {@link SettableSequenceId}
|
||||
*/
|
||||
public static void setSequenceId(Cell cell, long seqId) throws IOException {
|
||||
if (cell instanceof SettableSequenceId) {
|
||||
((SettableSequenceId) cell).setSequenceId(seqId);
|
||||
} else {
|
||||
throw new IOException(new UnsupportedOperationException("Cell is not of type "
|
||||
+ SettableSequenceId.class.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
* and actual tag bytes length.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class KeyValue implements Cell, HeapSize, Cloneable {
|
||||
public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId {
|
||||
private static final ArrayList<Tag> EMPTY_ARRAY_LIST = new ArrayList<Tag>();
|
||||
|
||||
static final Log LOG = LogFactory.getLog(KeyValue.class);
|
||||
|
@ -1387,15 +1387,6 @@ public class KeyValue implements Cell, HeapSize, Cloneable {
|
|||
return getKeyLength() - (int) getKeyDataStructureSize(rlength, flength, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Column (family + qualifier) length
|
||||
*/
|
||||
private int getTotalColumnLength(int rlength, int foffset) {
|
||||
int flength = getFamilyLength(foffset);
|
||||
int qlength = getQualifierLength(rlength,flength);
|
||||
return flength + qlength;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Timestamp offset
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,34 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Using this Interface one can mark a Cell as Sequence stampable. <br>
|
||||
* Note : Make sure to make Cell implementation of this type in server side.
|
||||
*/
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
|
||||
public interface SettableSequenceId {
|
||||
|
||||
/**
|
||||
* Sets with the given seqId.
|
||||
* @param seqId
|
||||
*/
|
||||
void setSequenceId(long seqId);
|
||||
}
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.SettableSequenceId;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue.SamePrefixComparator;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
|
@ -320,7 +321,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
* Note that the value byte[] part is still pointing to the currentBuffer and the
|
||||
* represented by the valueOffset and valueLength
|
||||
*/
|
||||
protected static class ClonedSeekerState implements Cell {
|
||||
// We return this as a Cell to the upper layers of read flow and might try setting a new SeqId
|
||||
// there. So this has to be an instance of SettableSequenceId. SeekerState need not be
|
||||
// SettableSequenceId as we never return that to top layers. When we have to, we make
|
||||
// ClonedSeekerState from it.
|
||||
protected static class ClonedSeekerState implements Cell, SettableSequenceId {
|
||||
private byte[] keyOnlyBuffer;
|
||||
private ByteBuffer currentBuffer;
|
||||
private short rowLength;
|
||||
|
@ -335,12 +340,12 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
private int tagsLength;
|
||||
private int tagsOffset;
|
||||
private byte[] cloneTagsBuffer;
|
||||
private long memstoreTS;
|
||||
private long seqId;
|
||||
private TagCompressionContext tagCompressionContext;
|
||||
|
||||
protected ClonedSeekerState(ByteBuffer currentBuffer, byte[] keyBuffer, short rowLength,
|
||||
int familyOffset, byte familyLength, int keyLength, int qualOffset, int qualLength,
|
||||
long timeStamp, byte typeByte, int valueLen, int valueOffset, long memStoreTS,
|
||||
long timeStamp, byte typeByte, int valueLen, int valueOffset, long seqId,
|
||||
int tagsOffset, int tagsLength, TagCompressionContext tagCompressionContext,
|
||||
byte[] tagsBuffer) {
|
||||
this.currentBuffer = currentBuffer;
|
||||
|
@ -355,7 +360,6 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
this.typeByte = typeByte;
|
||||
this.valueLength = valueLen;
|
||||
this.valueOffset = valueOffset;
|
||||
this.memstoreTS = memStoreTS;
|
||||
this.tagsOffset = tagsOffset;
|
||||
this.tagsLength = tagsLength;
|
||||
System.arraycopy(keyBuffer, 0, keyOnlyBuffer, 0, keyLength);
|
||||
|
@ -363,6 +367,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
this.cloneTagsBuffer = new byte[tagsLength];
|
||||
System.arraycopy(tagsBuffer, 0, this.cloneTagsBuffer, 0, tagsLength);
|
||||
}
|
||||
setSequenceId(seqId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -421,13 +426,14 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public long getMvccVersion() {
|
||||
return memstoreTS;
|
||||
return getSequenceId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSequenceId() {
|
||||
return memstoreTS;
|
||||
return seqId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -498,6 +504,11 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder {
|
|||
}
|
||||
return kv.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSequenceId(long seqId) {
|
||||
this.seqId = seqId;
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract static class
|
||||
|
|
|
@ -1994,7 +1994,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
private long getNextSequenceId(final HLog wal) throws IOException {
|
||||
HLogKey key = this.appendNoSyncNoAppend(wal, null);
|
||||
return key.getSequenceNumber();
|
||||
return key.getSequenceId();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2501,7 +2501,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
|
||||
// reference family maps directly so coprocessors can mutate them if desired
|
||||
Map<byte[], List<Cell>>[] familyMaps = new Map[batchOp.operations.length];
|
||||
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
|
||||
List<Cell> memstoreCells = new ArrayList<Cell>();
|
||||
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
|
||||
int firstIndex = batchOp.nextIndexToProcess;
|
||||
int lastIndexExclusive = firstIndex;
|
||||
|
@ -3081,9 +3081,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* @param output newly added KVs into memstore
|
||||
* @return the additional memory usage of the memstore caused by the
|
||||
* new entries.
|
||||
* @throws IOException
|
||||
*/
|
||||
private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
|
||||
long mvccNum, List<KeyValue> memstoreCells) {
|
||||
long mvccNum, List<Cell> memstoreCells) throws IOException {
|
||||
long size = 0;
|
||||
|
||||
for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
|
||||
|
@ -3092,9 +3093,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
Store store = getStore(family);
|
||||
for (Cell cell: cells) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
kv.setSequenceId(mvccNum);
|
||||
Pair<Long, Cell> ret = store.add(kv);
|
||||
CellUtil.setSequenceId(cell, mvccNum);
|
||||
Pair<Long, Cell> ret = store.add(cell);
|
||||
size += ret.getFirst();
|
||||
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
||||
}
|
||||
|
@ -3108,13 +3108,13 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* called when a Put/Delete has updated memstore but subsequently fails to update
|
||||
* the wal. This method is then invoked to rollback the memstore.
|
||||
*/
|
||||
private void rollbackMemstore(List<KeyValue> memstoreCells) {
|
||||
private void rollbackMemstore(List<Cell> memstoreCells) {
|
||||
int kvsRolledback = 0;
|
||||
|
||||
for (KeyValue kv : memstoreCells) {
|
||||
byte[] family = kv.getFamily();
|
||||
for (Cell cell : memstoreCells) {
|
||||
byte[] family = cell.getFamily();
|
||||
Store store = getStore(family);
|
||||
store.rollback(kv);
|
||||
store.rollback(cell);
|
||||
kvsRolledback++;
|
||||
}
|
||||
LOG.debug("rollbackMemstore rolled back " + kvsRolledback);
|
||||
|
@ -5043,7 +5043,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
List<RowLock> acquiredRowLocks;
|
||||
long addedSize = 0;
|
||||
List<Mutation> mutations = new ArrayList<Mutation>();
|
||||
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
|
||||
List<Cell> memstoreCells = new ArrayList<Cell>();
|
||||
Collection<byte[]> rowsToLock = processor.getRowsToLock();
|
||||
long mvccNum = 0;
|
||||
HLogKey walKey = null;
|
||||
|
@ -5075,16 +5075,16 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// 7. Apply to memstore
|
||||
for (Mutation m : mutations) {
|
||||
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
|
||||
kv.setSequenceId(mvccNum);
|
||||
Store store = getStore(kv);
|
||||
Cell cell = cellScanner.current();
|
||||
CellUtil.setSequenceId(cell, mvccNum);
|
||||
Store store = getStore(cell);
|
||||
if (store == null) {
|
||||
checkFamily(CellUtil.cloneFamily(kv));
|
||||
checkFamily(CellUtil.cloneFamily(cell));
|
||||
// unreachable
|
||||
}
|
||||
Pair<Long, Cell> ret = store.add(kv);
|
||||
Pair<Long, Cell> ret = store.add(cell);
|
||||
addedSize += ret.getFirst();
|
||||
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
||||
memstoreCells.add(ret.getSecond());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5242,7 +5242,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
WriteEntry w = null;
|
||||
HLogKey walKey = null;
|
||||
RowLock rowLock = null;
|
||||
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
|
||||
List<Cell> memstoreCells = new ArrayList<Cell>();
|
||||
boolean doRollBackMemstore = false;
|
||||
try {
|
||||
rowLock = getRowLock(row);
|
||||
|
@ -5354,14 +5354,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (store.getFamily().getMaxVersions() == 1) {
|
||||
// upsert if VERSIONS for this CF == 1
|
||||
size += store.upsert(entry.getValue(), getSmallestReadPoint());
|
||||
memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
|
||||
memstoreCells.addAll(entry.getValue());
|
||||
} else {
|
||||
// otherwise keep older versions around
|
||||
for (Cell cell: entry.getValue()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
Pair<Long, Cell> ret = store.add(kv);
|
||||
size += ret.getFirst();
|
||||
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
||||
memstoreCells.add(ret.getSecond());
|
||||
doRollBackMemstore = true;
|
||||
}
|
||||
}
|
||||
|
@ -5380,7 +5380,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
} else {
|
||||
recordMutationWithoutWal(append.getFamilyCellMap());
|
||||
}
|
||||
if(walKey == null){
|
||||
if (walKey == null) {
|
||||
// Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned
|
||||
walKey = this.appendNoSyncNoAppend(this.log, memstoreCells);
|
||||
}
|
||||
|
@ -5459,7 +5459,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
WriteEntry w = null;
|
||||
HLogKey walKey = null;
|
||||
long mvccNum = 0;
|
||||
List<KeyValue> memstoreCells = new ArrayList<KeyValue>();
|
||||
List<Cell> memstoreCells = new ArrayList<Cell>();
|
||||
boolean doRollBackMemstore = false;
|
||||
try {
|
||||
rowLock = getRowLock(row);
|
||||
|
@ -5575,14 +5575,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
if (store.getFamily().getMaxVersions() == 1) {
|
||||
// upsert if VERSIONS for this CF == 1
|
||||
size += store.upsert(entry.getValue(), getSmallestReadPoint());
|
||||
memstoreCells.addAll(KeyValueUtil.ensureKeyValues(entry.getValue()));
|
||||
memstoreCells.addAll(entry.getValue());
|
||||
} else {
|
||||
// otherwise keep older versions around
|
||||
for (Cell cell : entry.getValue()) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
|
||||
Pair<Long, Cell> ret = store.add(kv);
|
||||
size += ret.getFirst();
|
||||
memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond()));
|
||||
memstoreCells.add(ret.getSecond());
|
||||
doRollBackMemstore = true;
|
||||
}
|
||||
}
|
||||
|
@ -6373,12 +6373,12 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Append a faked WALEdit in order to get a long sequence number and log syncer will just ignore
|
||||
* the WALEdit append later.
|
||||
* @param wal
|
||||
* @param cells list of KeyValues inserted into memstore. Those KeyValues are passed in order to
|
||||
* be updated with right mvcc values(their log sequence nu
|
||||
* @param cells list of Cells inserted into memstore. Those Cells are passed in order to
|
||||
* be updated with right mvcc values(their log sequence number)
|
||||
* @return Return the key used appending with no sync and no append.
|
||||
* @throws IOException
|
||||
*/
|
||||
private HLogKey appendNoSyncNoAppend(final HLog wal, List<KeyValue> cells) throws IOException {
|
||||
private HLogKey appendNoSyncNoAppend(final HLog wal, List<Cell> cells) throws IOException {
|
||||
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable(),
|
||||
HLog.NO_SEQUENCE_ID, 0, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
|
||||
|
|
|
@ -631,10 +631,10 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Pair<Long, Cell> add(final KeyValue kv) {
|
||||
public Pair<Long, Cell> add(final Cell cell) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
return this.memstore.add(kv);
|
||||
return this.memstore.add(cell);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
@ -661,10 +661,10 @@ public class HStore implements Store {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void rollback(final KeyValue kv) {
|
||||
public void rollback(final Cell cell) {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
this.memstore.rollback(kv);
|
||||
this.memstore.rollback(cell);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
|
|
@ -114,11 +114,11 @@ public class MultiVersionConsistencyControl {
|
|||
* visible to MVCC readers.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum)
|
||||
public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
|
||||
throws IOException {
|
||||
if(e == null) return;
|
||||
if (seqNum != null) {
|
||||
e.setWriteNumber(seqNum.getSequenceNumber());
|
||||
if (seqId != null) {
|
||||
e.setWriteNumber(seqId.getSequenceId());
|
||||
} else {
|
||||
// set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
|
||||
// function beginMemstoreInsertWithSeqNum in case of failures
|
||||
|
|
|
@ -23,9 +23,9 @@ import java.io.IOException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Interface which abstracts implementations on log sequence number assignment
|
||||
* Interface which abstracts implementations on log sequenceId assignment
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface SequenceNumber {
|
||||
public long getSequenceNumber() throws IOException;
|
||||
public interface SequenceId {
|
||||
public long getSequenceId() throws IOException;
|
||||
}
|
|
@ -122,10 +122,10 @@ public interface Store extends HeapSize, StoreConfigInformation {
|
|||
|
||||
/**
|
||||
* Adds a value to the memstore
|
||||
* @param kv
|
||||
* @param cell
|
||||
* @return memstore size delta & newly added KV which maybe different than the passed in KV
|
||||
*/
|
||||
Pair<Long, Cell> add(KeyValue kv);
|
||||
Pair<Long, Cell> add(Cell cell);
|
||||
|
||||
/**
|
||||
* When was the last edit done in the memstore
|
||||
|
@ -133,11 +133,11 @@ public interface Store extends HeapSize, StoreConfigInformation {
|
|||
long timeOfOldestEdit();
|
||||
|
||||
/**
|
||||
* Removes a kv from the memstore. The KeyValue is removed only if its key & memstoreTS match the
|
||||
* key & memstoreTS value of the kv parameter.
|
||||
* @param kv
|
||||
* Removes a Cell from the memstore. The Cell is removed only if its key & memstoreTS match the
|
||||
* key & memstoreTS value of the cell parameter.
|
||||
* @param cell
|
||||
*/
|
||||
void rollback(final KeyValue kv);
|
||||
void rollback(final Cell cell);
|
||||
|
||||
/**
|
||||
* Find the key that matches <i>row</i> exactly, or the one that immediately precedes it. WARNING:
|
||||
|
|
|
@ -198,12 +198,10 @@ public class StoreFileScanner implements KeyValueScanner {
|
|||
}
|
||||
}
|
||||
|
||||
protected void setCurrentCell(Cell newVal) {
|
||||
protected void setCurrentCell(Cell newVal) throws IOException {
|
||||
this.cur = newVal;
|
||||
if(this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
|
||||
KeyValue curKV = KeyValueUtil.ensureKeyValue(cur);
|
||||
curKV.setSequenceId(this.reader.getSequenceID());
|
||||
cur = curKV;
|
||||
if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
|
||||
CellUtil.setSequenceId(cur, this.reader.getSequenceID());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
|
@ -241,7 +242,7 @@ public abstract class Compactor {
|
|||
for (Cell c : kvs) {
|
||||
KeyValue kv = KeyValueUtil.ensureKeyValue(c);
|
||||
if (cleanSeqId && kv.getSequenceId() <= smallestReadPoint) {
|
||||
kv.setSequenceId(0);
|
||||
CellUtil.setSequenceId(kv, 0);
|
||||
}
|
||||
writer.append(kv);
|
||||
++progress.currentCompactedKVs;
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.Syncable;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -1115,9 +1116,9 @@ class FSHLog implements HLog, Syncable {
|
|||
@Override
|
||||
public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
|
||||
final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore,
|
||||
final List<KeyValue> memstoreKVs)
|
||||
final List<Cell> memstoreCells)
|
||||
throws IOException {
|
||||
return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreKVs);
|
||||
return append(htd, info, key, edits, sequenceId, false, inMemstore, memstoreCells);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1132,7 +1133,7 @@ class FSHLog implements HLog, Syncable {
|
|||
* @param sync shall we sync after we call the append?
|
||||
* @param inMemstore
|
||||
* @param sequenceId The region sequence id reference.
|
||||
* @param memstoreKVs
|
||||
* @param memstoreCells
|
||||
* @return txid of this transaction or if nothing to do, the last txid
|
||||
* @throws IOException
|
||||
*/
|
||||
|
@ -1140,7 +1141,7 @@ class FSHLog implements HLog, Syncable {
|
|||
justification="Will never be null")
|
||||
private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
|
||||
WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore,
|
||||
List<KeyValue> memstoreKVs)
|
||||
List<Cell> memstoreCells)
|
||||
throws IOException {
|
||||
if (!this.enabled) return this.highestUnsyncedSequence;
|
||||
if (this.closed) throw new IOException("Cannot append; log is closed");
|
||||
|
@ -1158,7 +1159,7 @@ class FSHLog implements HLog, Syncable {
|
|||
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
|
||||
// edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
|
||||
// latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
|
||||
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreKVs);
|
||||
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri, memstoreCells);
|
||||
truck.loadPayload(entry, scope.detach());
|
||||
} finally {
|
||||
this.disruptor.getRingBuffer().publish(sequence);
|
||||
|
|
|
@ -18,13 +18,14 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
/**
|
||||
* A WAL Entry for {@link FSHLog} implementation. Immutable.
|
||||
|
@ -43,18 +44,18 @@ class FSWALEntry extends HLog.Entry {
|
|||
private final transient boolean inMemstore;
|
||||
private final transient HTableDescriptor htd;
|
||||
private final transient HRegionInfo hri;
|
||||
private final transient List<KeyValue> memstoreKVs;
|
||||
private final transient List<Cell> memstoreCells;
|
||||
|
||||
FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
|
||||
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
|
||||
final HTableDescriptor htd, final HRegionInfo hri, List<KeyValue> memstoreKVs) {
|
||||
final HTableDescriptor htd, final HRegionInfo hri, List<Cell> memstoreCells) {
|
||||
super(key, edit);
|
||||
this.regionSequenceIdReference = referenceToRegionSequenceId;
|
||||
this.inMemstore = inMemstore;
|
||||
this.htd = htd;
|
||||
this.hri = hri;
|
||||
this.sequence = sequence;
|
||||
this.memstoreKVs = memstoreKVs;
|
||||
this.memstoreCells = memstoreCells;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
|
@ -87,13 +88,14 @@ class FSWALEntry extends HLog.Entry {
|
|||
* WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
|
||||
* method to be called.
|
||||
* @return The region edit/sequence id we set for this edit.
|
||||
* @throws IOException
|
||||
* @see #getRegionSequenceId()
|
||||
*/
|
||||
long stampRegionSequenceId() {
|
||||
long stampRegionSequenceId() throws IOException {
|
||||
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
|
||||
if (!this.getEdit().isReplay() && memstoreKVs != null && !memstoreKVs.isEmpty()) {
|
||||
for(KeyValue kv : this.memstoreKVs){
|
||||
kv.setSequenceId(regionSequenceId);
|
||||
if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) {
|
||||
for (Cell cell : this.memstoreCells) {
|
||||
CellUtil.setSequenceId(cell, regionSequenceId);
|
||||
}
|
||||
}
|
||||
HLogKey key = getKey();
|
||||
|
|
|
@ -34,10 +34,10 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
@ -366,14 +366,13 @@ public interface HLog {
|
|||
* @param inMemstore Always true except for case where we are writing a compaction completion
|
||||
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
|
||||
* -- it is not an edit for memstore.
|
||||
* @param memstoreKVs list of KVs added into memstore
|
||||
* @param memstoreCells list of Cells added into memstore
|
||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||
* in it.
|
||||
* @throws IOException
|
||||
*/
|
||||
long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
|
||||
AtomicLong sequenceId, boolean inMemstore, List<KeyValue> memstoreKVs)
|
||||
throws IOException;
|
||||
AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreCells) throws IOException;
|
||||
|
||||
// TODO: Do we need all these versions of sync?
|
||||
void hsync() throws IOException;
|
||||
|
|
|
@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceNumber;
|
||||
import org.apache.hadoop.hbase.regionserver.SequenceId;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
@ -69,7 +69,7 @@ import com.google.protobuf.ByteString;
|
|||
// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
|
||||
// purposes. They need to be merged into HLogEntry.
|
||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
|
||||
public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
|
||||
public class HLogKey implements WritableComparable<HLogKey>, SequenceId {
|
||||
public static final Log LOG = LogFactory.getLog(HLogKey.class);
|
||||
|
||||
// should be < 0 (@see #readFields(DataInput))
|
||||
|
@ -278,7 +278,7 @@ public class HLogKey implements WritableComparable<HLogKey>, SequenceNumber {
|
|||
* @throws InterruptedException
|
||||
*/
|
||||
@Override
|
||||
public long getSequenceNumber() throws IOException {
|
||||
public long getSequenceId() throws IOException {
|
||||
try {
|
||||
this.seqNumAssignedLatch.await();
|
||||
} catch (InterruptedException ie) {
|
||||
|
|
|
@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
@ -970,7 +969,7 @@ public class TestHRegion {
|
|||
// throw exceptions if the WalEdit is a start flush action
|
||||
when(hlog.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
|
||||
(WALEdit)argThat(isFlushWALMarker), (AtomicLong)any(), Mockito.anyBoolean(),
|
||||
(List<KeyValue>)any()))
|
||||
(List<Cell>)any()))
|
||||
.thenThrow(new IOException("Fail to append flush marker"));
|
||||
|
||||
// start cache flush will throw exception
|
||||
|
@ -4499,7 +4498,7 @@ public class TestHRegion {
|
|||
//verify append called or not
|
||||
verify(log, expectAppend ? times(1) : never())
|
||||
.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
|
||||
(WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<KeyValue>)any());
|
||||
(WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean(), (List<Cell>)any());
|
||||
|
||||
// verify sync called or not
|
||||
if (expectSync || expectSyncFromLogSyncer) {
|
||||
|
@ -5524,7 +5523,7 @@ public class TestHRegion {
|
|||
TEST_UTIL.getConfiguration(), rss, null);
|
||||
|
||||
verify(log, times(1)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any()
|
||||
, editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any());
|
||||
, editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
|
||||
|
||||
WALEdit edit = editCaptor.getValue();
|
||||
assertNotNull(edit);
|
||||
|
@ -5588,7 +5587,7 @@ public class TestHRegion {
|
|||
|
||||
// 2 times, one for region open, the other close region
|
||||
verify(log, times(2)).appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
|
||||
editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<KeyValue>)any());
|
||||
editCaptor.capture(), (AtomicLong)any(), anyBoolean(), (List<Cell>)any());
|
||||
|
||||
WALEdit edit = editCaptor.getAllValues().get(1);
|
||||
assertNotNull(edit);
|
||||
|
|
Loading…
Reference in New Issue