diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index ed7d2747a77..48dc88035bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -124,13 +125,20 @@ public class CompactingMemStore extends AbstractMemStore { } /** - * This method is called when it is clear that the flush to disk is completed. - * The store may do any post-flush actions at this point. - * One example is to update the WAL with sequence number that is known only at the store level. + * This method is called before the flush is executed. + * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush + * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. */ @Override - public void finalizeFlush() { - updateLowestUnflushedSequenceIdInWAL(false); + public long preFlushSeqIDEstimation() { + if(compositeSnapshot) { + return HConstants.NO_SEQNUM; + } + Segment segment = getLastSegment(); + if(segment == null) { + return HConstants.NO_SEQNUM; + } + return segment.getMinSequenceId(); } @Override @@ -364,6 +372,12 @@ public class CompactingMemStore extends AbstractMemStore { } } + private Segment getLastSegment() { + Segment localActive = getActive(); + Segment tail = pipeline.getTail(); + return tail == null ? localActive : tail; + } + private byte[] getFamilyNameInBytes() { return store.getFamily().getName(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index e533bd0c35c..9a844e67e0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -267,6 +267,14 @@ public class CompactionPipeline { if(segment != null) pipeline.addLast(segment); } + public Segment getTail() { + List localCopy = getSegments(); + if(localCopy.isEmpty()) { + return null; + } + return localCopy.get(localCopy.size()-1); + } + private boolean addFirst(ImmutableSegment segment) { pipeline.addFirst(segment); return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index d4e6e12b89b..63af5704580 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -169,7 +170,8 @@ public class DefaultMemStore extends AbstractMemStore { } @Override - public void finalizeFlush() { + public long preFlushSeqIDEstimation() { + return HConstants.NO_SEQNUM; } @Override public boolean isSloppy() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index f35d788300a..ef6239d6f11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2412,9 +2412,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); MemstoreSize totalSizeOfFlushableStores = new MemstoreSize(); - Set flushedFamilyNames = new HashSet(); + Map flushedFamilyNamesToSeq = new HashMap<>(); for (Store store: storesToFlush) { - flushedFamilyNames.add(store.getFamily().getName()); + flushedFamilyNamesToSeq.put(store.getFamily().getName(), + ((HStore) store).preFlushSeqIDEstimation()); } TreeMap storeFlushCtxs @@ -2434,7 +2435,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { if (wal != null) { Long earliestUnflushedSequenceIdForTheRegion = - wal.startCacheFlush(encodedRegionName, flushedFamilyNames); + wal.startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq); if (earliestUnflushedSequenceIdForTheRegion == null) { // This should never happen. This is how startCacheFlush signals flush cannot proceed. String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; @@ -2677,9 +2678,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // If we get to here, the HStores have been written. - for(Store storeToFlush :storesToFlush) { - ((HStore) storeToFlush).finalizeFlush(); - } if (wal != null) { wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 425667a9288..ad23ce03663 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2509,8 +2509,8 @@ public class HStore implements Store { } } - public void finalizeFlush() { - memstore.finalizeFlush(); + public Long preFlushSeqIDEstimation() { + return memstore.preFlushSeqIDEstimation(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index b09447629c4..38d3e447a78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -119,12 +119,12 @@ public interface MemStore { MemstoreSize size(); /** - * This method is called when it is clear that the flush to disk is completed. - * The store may do any post-flush actions at this point. - * One example is to update the wal with sequence number that is known only at the store level. + * This method is called before the flush is executed. + * @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush + * is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}. */ - void finalizeFlush(); + long preFlushSeqIDEstimation(); - /* Return true if the memstore may need some extra memory space*/ + /* Return true if the memstore may use some extra memory space*/ boolean isSloppy(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 316e2f63e9c..7e3bd59f161 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -428,6 +428,15 @@ public abstract class AbstractFSWAL implements WAL { return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); } + @Override + public Long startCacheFlush(byte[] encodedRegionName, Map familyToSeq) { + if (!closeBarrier.beginOp()) { + LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); + return null; + } + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, familyToSeq); + } + @Override public void completeCacheFlush(byte[] encodedRegionName) { this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 6e7ad9bc00c..8226b82d28c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -264,6 +264,14 @@ class SequenceIdAccounting { * oldest/lowest outstanding edit. */ Long startCacheFlush(final byte[] encodedRegionName, final Set families) { + Map familytoSeq = new HashMap<>(); + for (byte[] familyName : families){ + familytoSeq.put(familyName,HConstants.NO_SEQNUM); + } + return startCacheFlush(encodedRegionName,familytoSeq); + } + + Long startCacheFlush(final byte[] encodedRegionName, final Map familyToSeq) { Map oldSequenceIds = null; Long lowestUnflushedInRegion = HConstants.NO_SEQNUM; synchronized (tieLock) { @@ -273,9 +281,14 @@ class SequenceIdAccounting { // circumstance because another concurrent thread now may add sequenceids for this family // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it // is fine because updates are blocked when this method is called. Make sure!!! - for (byte[] familyName : families) { - ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); - Long seqId = m.remove(familyNameWrapper); + for (Map.Entry entry : familyToSeq.entrySet()) { + ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey()); + Long seqId = null; + if(entry.getValue() == HConstants.NO_SEQNUM) { + seqId = m.remove(familyNameWrapper); + } else { + seqId = m.replace(familyNameWrapper, entry.getValue()); + } if (seqId != null) { if (oldSequenceIds == null) { oldSequenceIds = new HashMap<>(); @@ -344,7 +357,7 @@ class SequenceIdAccounting { if (flushing != null) { for (Map.Entry e : flushing.entrySet()) { Long currentId = tmpMap.get(e.getKey()); - if (currentId != null && currentId.longValue() <= e.getValue().longValue()) { + if (currentId != null && currentId.longValue() < e.getValue().longValue()) { String errorStr = Bytes.toString(encodedRegionName) + " family " + e.getKey().toStringUtf8() + " acquired edits out of order current memstore seq=" + currentId + ", previous oldest unflushed id=" + e.getValue(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 337f2b488ab..8f224fcd9dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -195,6 +196,11 @@ class DisabledWALProvider implements WALProvider { sync(); } + public Long startCacheFlush(final byte[] encodedRegionName, Map + flushedFamilyNamesToSeq) { + return startCacheFlush(encodedRegionName, flushedFamilyNamesToSeq.keySet()); + } + @Override public Long startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { if (closed.get()) return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 030d8b66d6e..b7adc60cc52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -161,6 +161,8 @@ public interface WAL extends Closeable { */ Long startCacheFlush(final byte[] encodedRegionName, Set families); + Long startCacheFlush(final byte[] encodedRegionName, Map familyToSeq); + /** * Complete the cache flush. * @param encodedRegionName Encoded region name.