HBASE-17407: Correct update of maxFlushedSeqId in HRegion

Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
eshcar 2017-01-19 01:11:58 +02:00 committed by zhangduo
parent 3abd13dacb
commit f254e278ec
10 changed files with 75 additions and 23 deletions

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -124,13 +125,20 @@ public class CompactingMemStore extends AbstractMemStore {
}
/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the WAL with sequence number that is known only at the store level.
* This method is called before the flush is executed.
* @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
* is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
@Override
public void finalizeFlush() {
updateLowestUnflushedSequenceIdInWAL(false);
public long preFlushSeqIDEstimation() {
if(compositeSnapshot) {
return HConstants.NO_SEQNUM;
}
Segment segment = getLastSegment();
if(segment == null) {
return HConstants.NO_SEQNUM;
}
return segment.getMinSequenceId();
}
@Override
@ -364,6 +372,12 @@ public class CompactingMemStore extends AbstractMemStore {
}
}
private Segment getLastSegment() {
Segment localActive = getActive();
Segment tail = pipeline.getTail();
return tail == null ? localActive : tail;
}
private byte[] getFamilyNameInBytes() {
return store.getFamily().getName();
}

View File

@ -267,6 +267,14 @@ public class CompactionPipeline {
if(segment != null) pipeline.addLast(segment);
}
public Segment getTail() {
List<? extends Segment> localCopy = getSegments();
if(localCopy.isEmpty()) {
return null;
}
return localCopy.get(localCopy.size()-1);
}
private boolean addFirst(ImmutableSegment segment) {
pipeline.addFirst(segment);
return true;

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@ -169,7 +170,8 @@ public class DefaultMemStore extends AbstractMemStore {
}
@Override
public void finalizeFlush() {
public long preFlushSeqIDEstimation() {
return HConstants.NO_SEQNUM;
}
@Override public boolean isSloppy() {

View File

@ -2412,9 +2412,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName());
MemstoreSize totalSizeOfFlushableStores = new MemstoreSize();
Set<byte[]> flushedFamilyNames = new HashSet<byte[]>();
Map<byte[], Long> flushedFamilyNamesToSeq = new HashMap<>();
for (Store store: storesToFlush) {
flushedFamilyNames.add(store.getFamily().getName());
flushedFamilyNamesToSeq.put(store.getFamily().getName(),
((HStore) store).preFlushSeqIDEstimation());
}
TreeMap<byte[], StoreFlushContext> storeFlushCtxs
@ -2434,7 +2435,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
if (wal != null) {
Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq);
if (earliestUnflushedSequenceIdForTheRegion == null) {
// This should never happen. This is how startCacheFlush signals flush cannot proceed.
String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
@ -2677,9 +2678,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
// If we get to here, the HStores have been written.
for(Store storeToFlush :storesToFlush) {
((HStore) storeToFlush).finalizeFlush();
}
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
}

View File

@ -2509,8 +2509,8 @@ public class HStore implements Store {
}
}
public void finalizeFlush() {
memstore.finalizeFlush();
public Long preFlushSeqIDEstimation() {
return memstore.preFlushSeqIDEstimation();
}
@Override

View File

@ -119,12 +119,12 @@ public interface MemStore {
MemstoreSize size();
/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the wal with sequence number that is known only at the store level.
* This method is called before the flush is executed.
* @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
* is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
void finalizeFlush();
long preFlushSeqIDEstimation();
/* Return true if the memstore may need some extra memory space*/
/* Return true if the memstore may use some extra memory space*/
boolean isSloppy();
}

View File

@ -428,6 +428,15 @@ public abstract class AbstractFSWAL<W> implements WAL {
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
}
@Override
public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyToSeq) {
if (!closeBarrier.beginOp()) {
LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
return null;
}
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq);
}
@Override
public void completeCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);

View File

@ -264,6 +264,14 @@ class SequenceIdAccounting {
* oldest/lowest outstanding edit.
*/
Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
Map<byte[],Long> familytoSeq = new HashMap<>();
for (byte[] familyName : families){
familytoSeq.put(familyName,HConstants.NO_SEQNUM);
}
return startCacheFlush(encodedRegionName,familytoSeq);
}
Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq) {
Map<ImmutableByteArray, Long> oldSequenceIds = null;
Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
synchronized (tieLock) {
@ -273,9 +281,14 @@ class SequenceIdAccounting {
// circumstance because another concurrent thread now may add sequenceids for this family
// (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
// is fine because updates are blocked when this method is called. Make sure!!!
for (byte[] familyName : families) {
ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName);
Long seqId = m.remove(familyNameWrapper);
for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) {
ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey());
Long seqId = null;
if(entry.getValue() == HConstants.NO_SEQNUM) {
seqId = m.remove(familyNameWrapper);
} else {
seqId = m.replace(familyNameWrapper, entry.getValue());
}
if (seqId != null) {
if (oldSequenceIds == null) {
oldSequenceIds = new HashMap<>();
@ -344,7 +357,7 @@ class SequenceIdAccounting {
if (flushing != null) {
for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) {
Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
if (currentId != null && currentId.longValue() < e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family "
+ e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq="
+ currentId + ", previous oldest unflushed id=" + e.getValue();

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@ -195,6 +196,11 @@ class DisabledWALProvider implements WALProvider {
sync();
}
public Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long>
flushedFamilyNamesToSeq) {
return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet());
}
@Override
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
if (closed.get()) return null;

View File

@ -161,6 +161,8 @@ public interface WAL extends Closeable {
*/
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
Long startCacheFlush(final byte[] encodedRegionName, Map<byte[], Long> familyToSeq);
/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.