HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS

This commit is contained in:
stack 2015-06-08 10:55:53 -07:00
parent 9f43a3bea6
commit 5c16b34e32
19 changed files with 907 additions and 479 deletions

View File

@ -414,7 +414,7 @@ public class RegionStates {
ServerName oldServerName = regionAssignments.put(hri, serverName); ServerName oldServerName = regionAssignments.put(hri, serverName);
if (!serverName.equals(oldServerName)) { if (!serverName.equals(oldServerName)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName + " " + hri); LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
} else { } else {
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
} }
@ -599,7 +599,7 @@ public class RegionStates {
// Region is open on this region server, but in transition. // Region is open on this region server, but in transition.
// This region must be moving away from this server, or splitting/merging. // This region must be moving away from this server, or splitting/merging.
// SSH will handle it, either skip assigning, or re-assign. // SSH will handle it, either skip assigning, or re-assign.
LOG.info("Transitioning " + state + " will be handled by SSH for " + sn); LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn);
} else if (sn.equals(state.getServerName())) { } else if (sn.equals(state.getServerName())) {
// Region is in transition on this region server, and this // Region is in transition on this region server, and this
// region is not open on this server. So the region must be // region is not open on this server. So the region must be
@ -610,7 +610,8 @@ public class RegionStates {
// tried several times to open it while this region server is not reachable) // tried several times to open it while this region server is not reachable)
if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN, if (isOneOfStates(state, State.OPENING, State.PENDING_OPEN,
State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) { State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn); LOG.info("Found region in " + state +
" to be reassigned by ServerCrashProcedure for " + sn);
rits.add(hri); rits.add(hri);
} else if (isOneOfStates(state, State.SPLITTING_NEW)) { } else if (isOneOfStates(state, State.SPLITTING_NEW)) {
regionsToCleanIfNoMetaEntry.add(state.getRegion()); regionsToCleanIfNoMetaEntry.add(state.getRegion());

View File

@ -120,9 +120,15 @@ public class ServerManager {
// Set if we are to shutdown the cluster. // Set if we are to shutdown the cluster.
private volatile boolean clusterShutdown = false; private volatile boolean clusterShutdown = false;
/**
* The last flushed sequence id for a region.
*/
private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion = private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
/**
* The last flushed sequence id for a store in a region.
*/
private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>> private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
storeFlushedSequenceIdsByRegion = storeFlushedSequenceIdsByRegion =
new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR); new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
@ -291,6 +297,10 @@ public class ServerManager {
Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName); Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
long l = entry.getValue().getCompleteSequenceId(); long l = entry.getValue().getCompleteSequenceId();
// Don't let smaller sequence ids override greater sequence ids. // Don't let smaller sequence ids override greater sequence ids.
if (LOG.isTraceEnabled()) {
LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
", completeSequenceId=" + l);
}
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) { if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
flushedSequenceIdByRegion.put(encodedRegionName, l); flushedSequenceIdByRegion.put(encodedRegionName, l);
} else if (l != HConstants.NO_SEQNUM && l < existingValue) { } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
@ -304,6 +314,10 @@ public class ServerManager {
byte[] family = storeSeqId.getFamilyName().toByteArray(); byte[] family = storeSeqId.getFamilyName().toByteArray();
existingValue = storeFlushedSequenceId.get(family); existingValue = storeFlushedSequenceId.get(family);
l = storeSeqId.getSequenceId(); l = storeSeqId.getSequenceId();
if (LOG.isTraceEnabled()) {
LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
", existingValue=" + existingValue + ", completeSequenceId=" + l);
}
// Don't let smaller sequence ids override greater sequence ids. // Don't let smaller sequence ids override greater sequence ids.
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) { if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
storeFlushedSequenceId.put(family, l); storeFlushedSequenceId.put(family, l);

View File

@ -76,9 +76,9 @@ public class FlushLargeStoresPolicy extends FlushPolicy {
private boolean shouldFlush(Store store) { private boolean shouldFlush(Store store) {
if (store.getMemStoreSize() > this.flushSizeLowerBound) { if (store.getMemStoreSize() > this.flushSizeLowerBound) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
+ " will be flushed because of memstoreSize(" + store.getMemStoreSize() region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
+ ") is larger than lower bound(" + this.flushSizeLowerBound + ")"); store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
} }
return true; return true;
} }

View File

@ -22,7 +22,6 @@ import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.text.ParseException; import java.text.ParseException;
import java.util.AbstractList; import java.util.AbstractList;
@ -216,13 +215,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final AtomicBoolean closing = new AtomicBoolean(false); final AtomicBoolean closing = new AtomicBoolean(false);
/** /**
* The max sequence id of flushed data on this region. Used doing some rough calculations on * The max sequence id of flushed data on this region. There is no edit in memory that is
* whether time to flush or not. * less that this sequence id.
*/ */
private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM; private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
/** /**
* Record the sequence id of last flush operation. * Record the sequence id of last flush operation. Can be in advance of
* {@link #maxFlushedSeqId} when flushing a single column family. In this case,
* {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
*/ */
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM; private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
/** /**
@ -604,6 +605,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* is new), then read them from the supplied path. * is new), then read them from the supplied path.
* @param htd the table descriptor * @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null * @param rsServices reference to {@link RegionServerServices} or null
* @deprecated Use other constructors.
*/ */
@Deprecated @Deprecated
public HRegion(final Path tableDir, final WAL wal, final FileSystem fs, public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
@ -1612,16 +1614,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes(); byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
regionLoadBldr.clearStoreCompleteSequenceId(); regionLoadBldr.clearStoreCompleteSequenceId();
for (byte[] familyName : this.stores.keySet()) { for (byte[] familyName : this.stores.keySet()) {
long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName); long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
// no oldestUnflushedSeqId means no data has written to the store after last flush, so we use // Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will
// lastFlushOpSeqId as complete sequence id for the store. // give us a sequence id that is for sure flushed. We want edit replay to start after this
regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId // sequence id in this region. If NO_SEQNUM, use the regions maximum flush id.
.newBuilder() long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1;
.setFamilyName(ByteString.copyFrom(familyName)) regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.
.setSequenceId( newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build());
oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
} }
return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId); return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -1907,27 +1908,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* returns true which will make a lot of flush requests. * returns true which will make a lot of flush requests.
*/ */
boolean shouldFlushStore(Store store) { boolean shouldFlushStore(Store store) {
long maxFlushedSeqId = long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store store.getFamily().getName()) - 1;
.getFamily().getName()) - 1; if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) {
if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
+ " will be flushed because its max flushed seqId(" + maxFlushedSeqId getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
+ ") is far away from current(" + sequenceId.get() + "), max allowed is " " is > " + this.flushPerChanges + " from current=" + sequenceId.get());
+ flushPerChanges);
} }
return true; return true;
} }
if (flushCheckInterval <= 0) { if (this.flushCheckInterval <= 0) {
return false; return false;
} }
long now = EnvironmentEdgeManager.currentTime(); long now = EnvironmentEdgeManager.currentTime();
if (store.timeOfOldestEdit() < now - flushCheckInterval) { if (store.timeOfOldestEdit() < now - this.flushCheckInterval) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " +
+ " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit() getRegionInfo().getEncodedName() + " because time of oldest edit=" +
+ ") is far away from now(" + now + "), max allowed is " + flushCheckInterval); store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now);
} }
return true; return true;
} }
@ -2081,18 +2080,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("Started memstore flush for " + this + ", current region memstore size " // Log a fat line detailing what is being flushed.
+ StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/" StringBuilder perCfExtras = null;
+ stores.size() + " column families' memstores are being flushed." if (!isAllFamilies(storesToFlush)) {
+ ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid)); perCfExtras = new StringBuilder();
// only log when we are not flushing all stores.
if (this.stores.size() > storesToFlush.size()) {
for (Store store: storesToFlush) { for (Store store: storesToFlush) {
LOG.info("Flushing Column Family: " + store.getColumnFamilyName() perCfExtras.append("; ");
+ " which was occupying " perCfExtras.append(store.getColumnFamilyName());
+ StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore."); perCfExtras.append("=");
perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize()));
} }
} }
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
" column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
} }
// Stop updates while we snapshot the memstore of all of these regions' stores. We only have // Stop updates while we snapshot the memstore of all of these regions' stores. We only have
// to do this for a moment. It is quick. We also set the memstore size to zero here before we // to do this for a moment. It is quick. We also set the memstore size to zero here before we
@ -2118,10 +2120,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>( TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR); Bytes.BYTES_COMPARATOR);
// The sequence id of this flush operation which is used to log FlushMarker and pass to // The sequence id of this flush operation which is used to log FlushMarker and pass to
// createFlushContext to use as the store file's sequence id. // createFlushContext to use as the store file's sequence id. It can be in advance of edits
// still in the memstore, edits that are in other column families yet to be flushed.
long flushOpSeqId = HConstants.NO_SEQNUM; long flushOpSeqId = HConstants.NO_SEQNUM;
// The max flushed sequence id after this flush operation. Used as completeSequenceId which is // The max flushed sequence id after this flush operation completes. All edits in memstore
// passed to HMaster. // will be in advance of this sequence id.
long flushedSeqId = HConstants.NO_SEQNUM; long flushedSeqId = HConstants.NO_SEQNUM;
byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
@ -2130,21 +2133,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try { try {
w = mvcc.beginMemstoreInsert(); w = mvcc.beginMemstoreInsert();
if (wal != null) { if (wal != null) {
if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) { Long earliestUnflushedSequenceIdForTheRegion =
// This should never happen. wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
String msg = "Flush will not be started for [" if (earliestUnflushedSequenceIdForTheRegion == null) {
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; // This should never happen. This is how startCacheFlush signals flush cannot proceed.
String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
status.setStatus(msg); status.setStatus(msg);
return new PrepareFlushResult( return new PrepareFlushResult(
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
myseqid); myseqid);
} }
flushOpSeqId = getNextSequenceId(wal); flushOpSeqId = getNextSequenceId(wal);
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName); // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
// no oldestUnflushedSeqId means we flushed all stores. flushedSeqId =
// or the unflushed stores are all empty. earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
: oldestUnflushedSeqId - 1;
} else { } else {
// use the provided sequence Id as WAL is not being used for this flush. // use the provided sequence Id as WAL is not being used for this flush.
flushedSeqId = flushOpSeqId = myseqid; flushedSeqId = flushOpSeqId = myseqid;
@ -2224,6 +2227,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
flushedSeqId, totalFlushableSizeOfFlushableStores); flushedSeqId, totalFlushableSizeOfFlushableStores);
} }
/**
* @param families
* @return True if passed Set is all families in the region.
*/
private boolean isAllFamilies(final Collection<Store> families) {
return families == null || this.stores.size() == families.size();
}
/** /**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded. * reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
@ -2339,10 +2350,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.lastStoreFlushTimeMap.put(store, startTime); this.lastStoreFlushTimeMap.put(store, startTime);
} }
// Update the oldest unflushed sequence id for region.
this.maxFlushedSeqId = flushedSeqId; this.maxFlushedSeqId = flushedSeqId;
// Record flush operation sequence id.
this.lastFlushOpSeqId = flushOpSeqId; this.lastFlushOpSeqId = flushOpSeqId;
// C. Finally notify anyone waiting on memstore to clear: // C. Finally notify anyone waiting on memstore to clear:
@ -3696,7 +3704,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Make request outside of synchronize block; HBASE-818. // Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false); this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this); LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
} }
} }
@ -4430,7 +4438,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ seqId + " is greater than current seqId:" + currentSeqId); + seqId + " is greater than current seqId:" + currentSeqId);
// Prepare flush (take a snapshot) and then abort (drop the snapshot) // Prepare flush (take a snapshot) and then abort (drop the snapshot)
if (store == null ) { if (store == null) {
for (Store s : stores.values()) { for (Store s : stores.values()) {
totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId); totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
} }
@ -5425,11 +5433,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
} else if (scannerContext.checkSizeLimit(limitScope)) { } else if (scannerContext.checkSizeLimit(limitScope)) {
ScannerContext.NextState state = ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues(); return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) { } else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state = ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues(); return scannerContext.setScannerState(state).hasMoreValues();
} }
} while (moreCellsInRow); } while (moreCellsInRow);
@ -5869,10 +5877,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException * @throws IOException
*/ */
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Configuration conf, final Configuration conf, final HTableDescriptor hTableDescriptor,
final HTableDescriptor hTableDescriptor, final WAL wal, final boolean initialize)
final WAL wal,
final boolean initialize)
throws IOException { throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString() LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir + + " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@ -6227,6 +6233,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param tabledir qualified path for table * @param tabledir qualified path for table
* @param name ENCODED region name * @param name ENCODED region name
* @return Path of HRegion directory * @return Path of HRegion directory
* @deprecated For tests only; to be removed.
*/ */
@Deprecated @Deprecated
public static Path getRegionDir(final Path tabledir, final String name) { public static Path getRegionDir(final Path tabledir, final String name) {
@ -6239,6 +6246,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param rootdir qualified path of HBase root directory * @param rootdir qualified path of HBase root directory
* @param info HRegionInfo for the region * @param info HRegionInfo for the region
* @return qualified path of region directory * @return qualified path of region directory
* @deprecated For tests only; to be removed.
*/ */
@Deprecated @Deprecated
@VisibleForTesting @VisibleForTesting
@ -7058,7 +7066,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
newTags.add(itr.next()); newTags.add(itr.next());
} }
} }
if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
idx++; idx++;
} }
@ -7646,6 +7654,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// sync the WAL edit (SYNC and FSYNC treated the same for now) // sync the WAL edit (SYNC and FSYNC treated the same for now)
this.wal.sync(txid); this.wal.sync(txid);
break; break;
default:
throw new RuntimeException("Unknown durability " + durability);
} }
} }
} }
@ -7734,8 +7744,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override @Override
public long getOldestSeqIdOfStore(byte[] familyName) { public long getOldestSeqIdOfStore(byte[] familyName) {
return wal.getEarliestMemstoreSeqNum(getRegionInfo() return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);
.getEncodedNameAsBytes(), familyName);
} }
@Override @Override

View File

@ -2845,24 +2845,14 @@ public class HRegionServer extends HasThread implements
@Override @Override
public boolean removeFromOnlineRegions(final Region r, ServerName destination) { public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName()); Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
if (destination != null) { if (destination != null) {
try { long closeSeqNum = r.getMaxFlushedSeqId();
WAL wal = getWAL(r.getRegionInfo());
long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
if (closeSeqNum == HConstants.NO_SEQNUM) { if (closeSeqNum == HConstants.NO_SEQNUM) {
// No edits in WAL for this region; get the sequence number when the region was opened. // No edits in WAL for this region; get the sequence number when the region was opened.
closeSeqNum = r.getOpenSeqNum(); closeSeqNum = r.getOpenSeqNum();
if (closeSeqNum == HConstants.NO_SEQNUM) { if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
closeSeqNum = 0;
}
} }
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum); addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
} catch (IOException exception) {
LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
"; not adding to moved regions.");
LOG.debug("Exception details for failure to get wal", exception);
}
} }
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName()); this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
return toReturn != null; return toReturn != null;

View File

@ -112,16 +112,6 @@ import com.google.common.collect.Sets;
* services is compaction services where files are aggregated once they pass * services is compaction services where files are aggregated once they pass
* a configurable threshold. * a configurable threshold.
* *
* <p>The only thing having to do with logs that Store needs to deal with is
* the reconstructionLog. This is a segment of an HRegion's log that might
* NOT be present upon startup. If the param is NULL, there's nothing to do.
* If the param is non-NULL, we need to process the log to reconstruct
* a TreeMap that might not have been written to disk before the process
* died.
*
* <p>It's assumed that after this constructor returns, the reconstructionLog
* file will be deleted (by whoever has instantiated the Store).
*
* <p>Locking and transactions are handled at a higher level. This API should * <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager. * not be called directly but by an HRegion manager.
*/ */
@ -899,8 +889,7 @@ public class HStore implements Store {
} }
/** /**
* Write out current snapshot. Presumes {@link #snapshot()} has been called * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
* previously.
* @param logCacheFlushId flush sequence number * @param logCacheFlushId flush sequence number
* @param snapshot * @param snapshot
* @param status * @param status

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.Service; import com.google.protobuf.Service;
@ -124,10 +125,19 @@ public interface Region extends ConfigurationObserver {
/** @return the latest sequence number that was read from storage when this region was opened */ /** @return the latest sequence number that was read from storage when this region was opened */
long getOpenSeqNum(); long getOpenSeqNum();
/** @return the max sequence id of flushed data on this region */ /** @return the max sequence id of flushed data on this region; no edit in memory will have
* a sequence id that is less that what is returned here.
*/
long getMaxFlushedSeqId(); long getMaxFlushedSeqId();
/** @return the oldest sequence id found in the store for the given family */ /** @return the oldest flushed sequence id for the given family; can be beyond
* {@link #getMaxFlushedSeqId()} in case where we've flushed a subset of a regions column
* families
* @deprecated Since version 1.2.0. Exposes too much about our internals; shutting it down.
* Do not use.
*/
@VisibleForTesting
@Deprecated
public long getOldestSeqIdOfStore(byte[] familyName); public long getOldestSeqIdOfStore(byte[] familyName);
/** /**

View File

@ -29,16 +29,13 @@ import java.net.URLEncoder;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.Set; import java.util.Set;
import java.util.TreeMap;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -90,7 +87,6 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler; import com.lmax.disruptor.ExceptionHandler;
@ -167,7 +163,7 @@ public class FSHLog implements WAL {
private final Disruptor<RingBufferTruck> disruptor; private final Disruptor<RingBufferTruck> disruptor;
/** /**
* An executorservice that runs the disrutpor AppendEventHandler append executor. * An executorservice that runs the disruptor AppendEventHandler append executor.
*/ */
private final ExecutorService appendExecutor; private final ExecutorService appendExecutor;
@ -209,6 +205,7 @@ public class FSHLog implements WAL {
* WAL directory, where all WAL files would be placed. * WAL directory, where all WAL files would be placed.
*/ */
private final Path fullPathLogDir; private final Path fullPathLogDir;
/** /**
* dir path where old logs are kept. * dir path where old logs are kept.
*/ */
@ -240,6 +237,7 @@ public class FSHLog implements WAL {
* conf object * conf object
*/ */
protected final Configuration conf; protected final Configuration conf;
/** Listeners that are called on WAL events. */ /** Listeners that are called on WAL events. */
private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>(); private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
@ -257,6 +255,7 @@ public class FSHLog implements WAL {
public WALCoprocessorHost getCoprocessorHost() { public WALCoprocessorHost getCoprocessorHost() {
return coprocessorHost; return coprocessorHost;
} }
/** /**
* FSDataOutputStream associated with the current SequenceFile.writer * FSDataOutputStream associated with the current SequenceFile.writer
*/ */
@ -287,6 +286,13 @@ public class FSHLog implements WAL {
// Enable it if the replications recover. // Enable it if the replications recover.
private volatile boolean lowReplicationRollEnabled = true; private volatile boolean lowReplicationRollEnabled = true;
/**
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding
* sequence id as yet not flushed as well as the most recent edit sequence id appended to the
* WAL. Has facility for answering questions such as "Is it safe to GC a WAL?".
*/
private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
/** /**
* Current log file. * Current log file.
*/ */
@ -333,52 +339,6 @@ public class FSHLog implements WAL {
private final AtomicInteger closeErrorCount = new AtomicInteger(); private final AtomicInteger closeErrorCount = new AtomicInteger();
// Region sequence id accounting across flushes and for knowing when we can GC a WAL. These
// sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
// done above in failedSequence, highest sequence, etc.
/**
* This lock ties all operations on lowestFlushingStoreSequenceIds and
* oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
* oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
* sequence id, or to find regions with old sequence ids to force flush; we are interested in
* old stuff not the new additions (TODO: IS THIS SAFE? CHECK!).
*/
private final Object regionSequenceIdLock = new Object();
/**
* Map of encoded region names and family names to their OLDEST -- i.e. their first,
* the longest-lived -- sequence id in memstore. Note that this sequence id is the region
* sequence id. This is not related to the id we use above for {@link #highestSyncedSequence}
* and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
* ring buffer.
*/
private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
= new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
Bytes.BYTES_COMPARATOR);
/**
* Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
* memstore currently being flushed out to hfiles. Entries are moved here from
* {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
* (so movement between the Maps is atomic). This is not related to the id we use above for
* {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
* the disruptor ring buffer, an internal detail.
*/
private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
/**
* Map of region encoded names to the latest region sequence id. Updated on each append of
* WALEdits to the WAL. We create one map for each WAL file at the time it is rolled.
* <p>When deciding whether to archive a WAL file, we compare the sequence IDs in this map to
* {@link #lowestFlushingRegionSequenceIds} and {@link #oldestUnflushedRegionSequenceIds}.
* See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info.
* <p>
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
* the same array.
*/
private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
/** /**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. * WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
@ -395,7 +355,7 @@ public class FSHLog implements WAL {
}; };
/** /**
* Map of wal log file to the latest sequence ids of all regions it has entries of. * Map of WAL log file to the latest sequence ids of all regions it has entries of.
* The map is sorted by the log file creation timestamp (contained in the log file name). * The map is sorted by the log file creation timestamp (contained in the log file name).
*/ */
private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds = private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
@ -541,7 +501,7 @@ public class FSHLog implements WAL {
(long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); (long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication", this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
FSUtils.getDefaultReplication(fs, this.fullPathLogDir)); FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
this.lowReplicationRollLimit = this.lowReplicationRollLimit =
conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5); conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
@ -744,128 +704,37 @@ public class FSHLog implements WAL {
return DefaultWALProvider.createWriter(conf, fs, path, false); return DefaultWALProvider.createWriter(conf, fs, path, false);
} }
private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
long result = HConstants.NO_SEQNUM;
for (Long seqNum: seqIdMap.values()) {
if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
result = seqNum.longValue();
}
}
return result;
}
private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
Map<byte[], T> mapToCopy) {
Map<byte[], Long> copied = Maps.newHashMap();
for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
long lowestSeqId = getLowestSeqId(entry.getValue());
if (lowestSeqId != HConstants.NO_SEQNUM) {
copied.put(entry.getKey(), lowestSeqId);
}
}
return copied;
}
/** /**
* Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
* have been flushed to hfiles.
* <p>
* For each log file, it compares its region to sequenceId map
* (@link {@link FSHLog#highestRegionSequenceIds} with corresponding region entries in
* {@link FSHLog#lowestFlushingRegionSequenceIds} and
* {@link FSHLog#oldestUnflushedRegionSequenceIds}. If all the regions in the map are flushed
* past of their value, then the wal is eligible for archiving.
* @throws IOException * @throws IOException
*/ */
private void cleanOldLogs() throws IOException { private void cleanOldLogs() throws IOException {
Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null; List<Path> logsToArchive = null;
Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null; // For each log file, look at its Map of regions to highest sequence id; if all sequence ids
List<Path> logsToArchive = new ArrayList<Path>(); // are older than what is currently in memory, the WAL can be GC'd.
// make a local copy so as to avoid locking when we iterate over these maps. for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
synchronized (regionSequenceIdLock) {
lowestFlushingRegionSequenceIdsLocal =
copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
oldestUnflushedRegionSequenceIdsLocal =
copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
}
for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
// iterate over the log file.
Path log = e.getKey(); Path log = e.getKey();
Map<byte[], Long> sequenceNums = e.getValue(); Map<byte[], Long> sequenceNums = e.getValue();
// iterate over the map for this log file, and tell whether it should be archive or not. if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal, if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
oldestUnflushedRegionSequenceIdsLocal)) {
logsToArchive.add(log); logsToArchive.add(log);
LOG.debug("WAL file ready for archiving " + log); if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
} }
} }
if (logsToArchive != null) {
for (Path p : logsToArchive) { for (Path p : logsToArchive) {
this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
archiveLogFile(p); archiveLogFile(p);
this.byWalRegionSequenceIds.remove(p); this.byWalRegionSequenceIds.remove(p);
} }
} }
/**
* Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived.
* It compares the region entries present in the passed sequenceNums map with the local copy of
* {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If,
* for all regions, the value is lesser than the minimum of values present in the
* oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving.
* @param sequenceNums for a WAL, at the time when it was rolled.
* @param oldestFlushingMap
* @param oldestUnflushedMap
* @return true if wal is eligible for archiving, false otherwise.
*/
static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
// find region entries in the flushing/unflushed map. If there is no entry, it meansj
// a region doesn't have any unflushed entry.
long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
// do a minimum to be sure to contain oldest sequence Id
long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive
}
return true;
} }
/** /**
* Iterates over the given map of regions, and compares their sequence numbers with corresponding * If the number of un-archived WAL files is greater than maximum allowed, check the first
* entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or * (oldest) WAL file, and returns those regions which should be flushed so that it can
* equal, the region is eligible to flush, otherwise, there is no benefit to flush (from the
* perspective of passed regionsSequenceNums map), because the region has already flushed the
* entries present in the WAL file for which this method is called for (typically, the oldest
* wal file).
* @param regionsSequenceNums
* @return regions which should be flushed (whose sequence numbers are larger than their
* corresponding un-flushed entries.
*/
private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
List<byte[]> regionsToFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (regionSequenceIdLock) {
for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
if (regionsToFlush == null)
regionsToFlush = new ArrayList<byte[]>();
regionsToFlush.add(e.getKey());
}
}
}
return regionsToFlush == null ? null : regionsToFlush
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
/**
* If the number of un-archived WAL files is greater than maximum allowed, it checks
* the first (oldest) WAL file, and returns the regions which should be flushed so that it could
* be archived. * be archived.
* @return regions to flush in order to archive oldest wal file. * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
* @throws IOException * @throws IOException
*/ */
byte[][] findRegionsToForceFlush() throws IOException { byte[][] findRegionsToForceFlush() throws IOException {
@ -874,7 +743,7 @@ public class FSHLog implements WAL {
if (logCount > this.maxLogs && logCount > 0) { if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, Map<byte[], Long>> firstWALEntry = Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
this.byWalRegionSequenceIds.firstEntry(); this.byWalRegionSequenceIds.firstEntry();
regions = findEligibleMemstoresToFlush(firstWALEntry.getValue()); regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
} }
if (regions != null) { if (regions != null) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -882,9 +751,8 @@ public class FSHLog implements WAL {
if (i > 0) sb.append(", "); if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i])); sb.append(Bytes.toStringBinary(regions[i]));
} }
LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" + LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + "; forcing flush of " + regions.length + " regions(s): " + sb.toString());
sb.toString());
} }
return regions; return regions;
} }
@ -962,8 +830,7 @@ public class FSHLog implements WAL {
this.numEntries.set(0); this.numEntries.set(0);
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
if (oldPath != null) { if (oldPath != null) {
this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds); this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
this.highestRegionSequenceIds = new HashMap<byte[], Long>();
long oldFileLen = this.fs.getFileStatus(oldPath).getLen(); long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
this.totalLogSize.addAndGet(oldFileLen); this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
@ -1107,7 +974,7 @@ public class FSHLog implements WAL {
LOG.debug("Moved " + files.length + " WAL file(s) to " + LOG.debug("Moved " + files.length + " WAL file(s) to " +
FSUtils.getPath(this.fullPathArchiveDir)); FSUtils.getPath(this.fullPathArchiveDir));
} }
LOG.info("Closed WAL: " + toString() ); LOG.info("Closed WAL: " + toString());
} }
@Override @Override
@ -1630,108 +1497,24 @@ public class FSHLog implements WAL {
} }
@Override @Override
public boolean startCacheFlush(final byte[] encodedRegionName, public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) {
Set<byte[]> flushedFamilyNames) {
Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
if (!closeBarrier.beginOp()) { if (!closeBarrier.beginOp()) {
LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
" - because the server is closing."); return null;
return false;
} }
synchronized (regionSequenceIdLock) { return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
oldestUnflushedStoreSequenceIds.get(encodedRegionName);
if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
for (byte[] familyName: flushedFamilyNames) {
Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
if (seqId != null) {
oldStoreSeqNum.put(familyName, seqId);
}
}
if (!oldStoreSeqNum.isEmpty()) {
Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
encodedRegionName, oldStoreSeqNum);
assert oldValue == null: "Flushing map not cleaned up for "
+ Bytes.toString(encodedRegionName);
}
if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
// Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
// even if the region is already moved to other server.
// Do not worry about data racing, we held write lock of region when calling
// startCacheFlush, so no one can add value to the map we removed.
oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
}
}
}
if (oldStoreSeqNum.isEmpty()) {
// TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
// the region is already flushing (which would make this call invalid), or there
// were no appends after last flush, so why are we starting flush? Maybe we should
// assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
// For now preserve old logic.
LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+ Bytes.toString(encodedRegionName) + "]");
}
return true;
} }
@Override @Override
public void completeCacheFlush(final byte [] encodedRegionName) { public void completeCacheFlush(final byte [] encodedRegionName) {
synchronized (regionSequenceIdLock) { this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
}
closeBarrier.endOp(); closeBarrier.endOp();
} }
private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
byte[] encodedRegionName) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
oldestUnflushedStoreSequenceIds.get(encodedRegionName);
if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
return oldestUnflushedStoreSequenceIdsOfRegion;
}
oldestUnflushedStoreSequenceIdsOfRegion =
new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
ConcurrentMap<byte[], Long> alreadyPut =
oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName,
oldestUnflushedStoreSequenceIdsOfRegion);
return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
}
@Override @Override
public void abortCacheFlush(byte[] encodedRegionName) { public void abortCacheFlush(byte[] encodedRegionName) {
Map<byte[], Long> storeSeqNumsBeforeFlushStarts; this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (regionSequenceIdLock) {
storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
encodedRegionName);
if (storeSeqNumsBeforeFlushStarts != null) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
.entrySet()) {
currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
familyNameAndSeqId.getValue()));
}
}
}
closeBarrier.endOp(); closeBarrier.endOp();
if (storeSeqNumsBeforeFlushStarts != null) {
for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
if (currentSeqNum != null
&& currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
String errorStr =
"Region " + Bytes.toString(encodedRegionName) + " family "
+ Bytes.toString(familyNameAndSeqId.getKey())
+ " acquired edits out of order current memstore seq=" + currentSeqNum
+ ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
LOG.error(errorStr);
Runtime.getRuntime().halt(1);
}
}
}
} }
@VisibleForTesting @VisibleForTesting
@ -1761,23 +1544,21 @@ public class FSHLog implements WAL {
@Override @Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion = // Used by tests. Deprecated as too subtle for general usage.
this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
return oldestUnflushedStoreSequenceIdsOfRegion != null ?
getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
} }
@Override @Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
byte[] familyName) { // This method is used by tests and for figuring if we should flush or not because our
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion = // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
if (oldestUnflushedStoreSequenceIdsOfRegion != null) { // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); // currently flushing sequence ids, and if anything found there, it is returning these. This is
return result != null ? result.longValue() : HConstants.NO_SEQNUM; // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
} else { // we crash during the flush. For figuring what to flush, we might get requeued if our sequence
return HConstants.NO_SEQNUM; // id is old even though we are currently flushing. This may mean we do too much flushing.
} return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
} }
/** /**
@ -1819,10 +1600,10 @@ public class FSHLog implements WAL {
/** /**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained. * For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()} * Thread A will be held in here until Thread B calls {@link #safePointAttained()}
* @throws InterruptedException
* @throws ExecutionException
* @param syncFuture We need this as barometer on outstanding syncs. If it comes home with * @param syncFuture We need this as barometer on outstanding syncs. If it comes home with
* an exception, then something is up w/ our syncing. * an exception, then something is up w/ our syncing.
* @throws InterruptedException
* @throws ExecutionException
* @return The passed <code>syncFuture</code> * @return The passed <code>syncFuture</code>
* @throws FailedSyncBeforeLogCloseException * @throws FailedSyncBeforeLogCloseException
*/ */
@ -2013,15 +1794,6 @@ public class FSHLog implements WAL {
} }
} }
private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
Set<byte[]> familyNameSet, Long lRegionSequenceId) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
for (byte[] familyName : familyNameSet) {
oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
}
}
/** /**
* Append to the WAL. Does all CP and WAL listener calls. * Append to the WAL. Does all CP and WAL listener calls.
* @param entry * @param entry
@ -2066,13 +1838,8 @@ public class FSHLog implements WAL {
writer.append(entry); writer.append(entry);
assert highestUnsyncedSequence < entry.getSequence(); assert highestUnsyncedSequence < entry.getSequence();
highestUnsyncedSequence = entry.getSequence(); highestUnsyncedSequence = entry.getSequence();
Long lRegionSequenceId = Long.valueOf(regionSequenceId); sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); entry.isInMemstore());
if (entry.isInMemstore()) {
updateOldestUnflushedSequenceIds(encodedRegionName,
entry.getFamilyNames(), lRegionSequenceId);
}
coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics. // Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start); postAppend(entry, EnvironmentEdgeManager.currentTime() - start);

View File

@ -0,0 +1,363 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.collect.Maps;
/**
* Accounting of sequence ids per region and then by column family. So we can our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance
* can keep abreast of the state of sequence id persistence. Also call update per append.
*/
class SequenceIdAccounting {
private static final Log LOG = LogFactory.getLog(SequenceIdAccounting.class);
/**
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
* lowest outstanding sequence ids EXCEPT when flushing. When we flush, the current
* lowest set for the region/column family are moved (atomically because of this lock) to
* {@link #flushingSequenceIds}.
*
* <p>The two Maps are tied by this locking object EXCEPT when we go to update the lowest
* entry; see {@link #lowest(byte[], Set, Long)}. In here is a putIfAbsent call on
* {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest
* sequence id if we find that there is no entry for the current column family. There will be no
* entry only if we just came up OR we have moved aside current set of lowest sequence ids
* because the current set are being flushed (by putting them into {@link #flushingSequenceIds}).
* This is how we pick up the next 'lowest' sequence id per region per column family to be used
* figuring what is in the next flush.
*/
private final Object tieLock = new Object();
/**
* Map of encoded region names and family names to their OLDEST -- i.e. their first,
* the longest-lived, their 'earliest', the 'lowest' -- sequence id.
*
* <p>When we flush, the current lowest sequence ids get cleared and added to
* {@link #flushingSequenceIds}. The next append that comes in, is then added
* here to {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid.
*
* <p>If flush fails, currently server is aborted so no need to restore previous sequence ids.
* <p>Needs to be concurrent Maps because we use putIfAbsent updating oldest.
*/
private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> lowestUnflushedSequenceIds
= new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
Bytes.BYTES_COMPARATOR);
/**
* Map of encoded region names and family names to their lowest or OLDEST sequence/edit id
* currently being flushed out to hfiles. Entries are moved here from
* {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held
* (so movement between the Maps is atomic).
*/
private final Map<byte[], Map<byte[], Long>> flushingSequenceIds =
new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
/**
* Map of region encoded names to the latest/highest region sequence id. Updated on each
* call to append.
* <p>
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
* the same array.
*/
private Map<byte[], Long> highestSequenceIds = new HashMap<byte[], Long>();
/**
* Returns the lowest unflushed sequence id for the region.
* @param encodedRegionName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
* return {@link HConstants#NO_SEQNUM} when none.
*/
long getLowestSequenceId(final byte [] encodedRegionName) {
synchronized (this.tieLock) {
Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
long flushingLowest = m != null? getLowestSequenceId(m): Long.MAX_VALUE;
m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
long unflushedLowest = m != null? getLowestSequenceId(m): HConstants.NO_SEQNUM;
return Math.min(flushingLowest, unflushedLowest);
}
}
/**
* @param encodedRegionName
* @param familyName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
* <code>familyName</code>. Returned sequenceid may be for an edit currently being flushed.
*/
long getLowestSequenceId(final byte [] encodedRegionName, final byte [] familyName) {
synchronized (this.tieLock) {
Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
if (m != null) {
Long lowest = m.get(familyName);
if (lowest != null) return lowest;
}
m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) {
Long lowest = m.get(familyName);
if (lowest != null) return lowest;
}
}
return HConstants.NO_SEQNUM;
}
/**
* Reset the accounting of highest sequenceid by regionname.
* @return Return the previous accounting Map of regions to the last sequence id written into
* each.
*/
Map<byte[], Long> resetHighest() {
Map<byte[], Long> old = this.highestSequenceIds;
this.highestSequenceIds = new HashMap<byte[], Long>();
return old;
}
/**
* We've been passed a new sequenceid for the region. Set it as highest seen for this region and
* if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing
* currently older.
* @param encodedRegionName
* @param families
* @param sequenceid
* @param lowest Whether to keep running account of oldest sequence id.
*/
void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
final boolean lowest) {
Long l = Long.valueOf(sequenceid);
this.highestSequenceIds.put(encodedRegionName, l);
if (lowest) {
ConcurrentMap<byte[], Long> m = getOrCreateLowestSequenceIds(encodedRegionName);
for (byte[] familyName : families) {
m.putIfAbsent(familyName, l);
}
}
}
ConcurrentMap<byte[], Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {
// Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append.
ConcurrentMap<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) return m;
m = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
// Another thread may have added it ahead of us.
ConcurrentMap<byte[], Long> alreadyPut =
this.lowestUnflushedSequenceIds.putIfAbsent(encodedRegionName, m);
return alreadyPut == null? m : alreadyPut;
}
/**
* @param sequenceids Map to search for lowest value.
* @return Lowest value found in <code>sequenceids</code>.
*/
static long getLowestSequenceId(Map<byte[], Long> sequenceids) {
long lowest = HConstants.NO_SEQNUM;
for (Long sid: sequenceids.values()) {
if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) {
lowest = sid.longValue();
}
}
return lowest;
}
/**
* @param src
* @return New Map that has same keys as <code>src</code> but instead of a Map for a value, it
* instead has found the smallest sequence id and it returns that as the value instead.
*/
private <T extends Map<byte[], Long>> Map<byte[], Long> flattenToLowestSequenceId(
Map<byte[], T> src) {
if (src == null || src.isEmpty()) return null;
Map<byte[], Long> tgt = Maps.newHashMap();
for (Map.Entry<byte[], T> entry: src.entrySet()) {
long lowestSeqId = getLowestSequenceId(entry.getValue());
if (lowestSeqId != HConstants.NO_SEQNUM) {
tgt.put(entry.getKey(), lowestSeqId);
}
}
return tgt;
}
/**
* @param encodedRegionName Region to flush.
* @param families Families to flush. May be a subset of all families in the region.
* @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
* we are flushing a subset of all families but there are no edits in those families not
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
*/
Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
Map<byte[], Long> oldSequenceIds = null;
Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
synchronized (tieLock) {
Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) {
// NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled
// circumstance because another concurrent thread now may add sequenceids for this family
// (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
// is fine because updates are blocked when this method is called. Make sure!!!
for (byte[] familyName: families) {
Long seqId = m.remove(familyName);
if (seqId != null) {
if (oldSequenceIds == null) oldSequenceIds = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
oldSequenceIds.put(familyName, seqId);
}
}
if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) {
if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) {
LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) +
", sequenceid=" + oldSequenceIds);
}
}
if (m.isEmpty()) {
// Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
// even if the region is already moved to other server.
// Do not worry about data racing, we held write lock of region when calling
// startCacheFlush, so no one can add value to the map we removed.
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
} else {
// Flushing a subset of the region families. Return the sequence id of the oldest entry.
lowestUnflushedInRegion = Collections.min(m.values());
}
}
}
// Do this check outside lock.
if (oldSequenceIds != null && oldSequenceIds.isEmpty()) {
// TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
// the region is already flushing (which would make this call invalid), or there
// were no appends after last flush, so why are we starting flush? Maybe we should
// assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
// For now preserve old logic.
LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName));
}
return lowestUnflushedInRegion;
}
void completeCacheFlush(final byte [] encodedRegionName) {
synchronized (tieLock) {
this.flushingSequenceIds.remove(encodedRegionName);
}
}
void abortCacheFlush(final byte[] encodedRegionName) {
// Method is called when we are crashing down because failed write flush AND it is called
// if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids.
Map<byte[], Long> flushing = null;
Map<byte[], Long> tmpMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
// Here we are moving sequenceids from flushing back to unflushed; doing opposite of what
// happened in startCacheFlush. During prepare phase, we have update lock on the region so
// no edits should be coming in via append.
synchronized (tieLock) {
flushing = this.flushingSequenceIds.remove(encodedRegionName);
if (flushing != null) {
Map<byte[], Long> unflushed = getOrCreateLowestSequenceIds(encodedRegionName);
for (Map.Entry<byte[], Long> e: flushing.entrySet()) {
// Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this
// value, it will now be in tmpMap.
tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue()));
}
}
}
// Here we are doing some 'test' to see if edits are going in out of order. What is it for?
// Carried over from old code.
if (flushing != null) {
for (Map.Entry<byte[], Long> e : flushing.entrySet()) {
Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family " +
Bytes.toString(e.getKey()) + " acquired edits out of order current memstore seq=" +
currentId + ", previous oldest unflushed id=" + e.getValue();
LOG.error(errorStr);
Runtime.getRuntime().halt(1);
}
}
}
}
/**
* See if passed <code>sequenceids</code> are lower -- i.e. earlier -- than any outstanding
* sequenceids, sequenceids we are holding on to in this accounting instance.
* @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make
* sense for it to be null).
* @return true if all sequenceids are lower, older than, the old sequenceids in this instance.
*/
boolean areAllLower(Map<byte[], Long> sequenceids) {
Map<byte[], Long> flushing = null;
Map<byte[], Long> unflushed = null;
synchronized (this.tieLock) {
// Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed
// data structures to use in tests below.
flushing = flattenToLowestSequenceId(this.flushingSequenceIds);
unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);
}
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
long oldestFlushing = Long.MAX_VALUE;
long oldestUnflushed = Long.MAX_VALUE;
if (flushing != null) {
if (flushing.containsKey(e.getKey())) oldestFlushing = flushing.get(e.getKey());
}
if (unflushed != null) {
if (unflushed.containsKey(e.getKey())) oldestUnflushed = unflushed.get(e.getKey());
}
long min = Math.min(oldestFlushing, oldestUnflushed);
if (min <= e.getValue()) return false;
}
return true;
}
/**
* Iterates over the given Map and compares sequence ids with corresponding
* entries in {@link #oldestUnflushedRegionSequenceIds}. If a region in
* {@link #oldestUnflushedRegionSequenceIds} has a sequence id less than that passed
* in <code>sequenceids</code> then return it.
* @param sequenceids Sequenceids keyed by encoded region name.
* @return regions found in this instance with sequence ids less than those passed in.
*/
byte[][] findLower(Map<byte[], Long> sequenceids) {
List<byte[]> toFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (tieLock) {
for (Map.Entry<byte[], Long> e: sequenceids.entrySet()) {
Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(e.getKey());
if (m == null) continue;
// The lowest sequence id outstanding for this region.
long lowest = getLowestSequenceId(m);
if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
if (toFlush == null) toFlush = new ArrayList<byte[]>();
toFlush.add(e.getKey());
}
}
}
return toFlush == null? null: toFlush.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
}

View File

@ -186,8 +186,9 @@ class DisabledWALProvider implements WALProvider {
} }
@Override @Override
public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) { public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
return !(closed.get()); if (closed.get()) return null;
return HConstants.NO_SEQNUM;
} }
@Override @Override

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
@ -39,6 +40,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.common.annotations.VisibleForTesting;
/** /**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides * A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
* APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc). * APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@ -140,31 +143,36 @@ public interface WAL {
void sync(long txid) throws IOException; void sync(long txid) throws IOException;
/** /**
* WAL keeps track of the sequence numbers that were not yet flushed from memstores * WAL keeps track of the sequence numbers that are as yet not flushed im memstores
* in order to be able to do cleanup. This method tells WAL that some region is about * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL
* to flush memstore. * that some region is about to flush. The flush can be the whole region or for a column family
* of the region only.
* *
* <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this * <p>Currently, it is expected that the update lock is held for the region; i.e. no
* region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit, * concurrent appends while we set up cache flush.
* AtomicLong, boolean, List)} as new oldest seqnum. * @param families Families to flush. May be a subset of all families in the region.
* In case of flush being aborted, we put the stashed value back; in case of flush succeeding, * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
* the seqNum of that first edit after start becomes the valid oldest seqNum for this region. * we are flushing a subset of all families but there are no edits in those families not
* * being flushed; in other words, this is effectively same as a flush of all of the region
* @return true if the flush can proceed, false in case wal is closing (ususally, when server is * though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* closing) and flush couldn't be started. * oldest/lowest outstanding edit.
* @see #completeCacheFlush(byte[])
* @see #abortCacheFlush(byte[])
*/ */
boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames); Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
/** /**
* Complete the cache flush. * Complete the cache flush.
* @param encodedRegionName Encoded region name. * @param encodedRegionName Encoded region name.
* @see #startCacheFlush(byte[], Set)
* @see #abortCacheFlush(byte[])
*/ */
void completeCacheFlush(final byte[] encodedRegionName); void completeCacheFlush(final byte[] encodedRegionName);
/** /**
* Abort a cache flush. Call if the flush fails. Note that the only recovery * Abort a cache flush. Call if the flush fails. Note that the only recovery
* for an aborted flush currently is a restart of the regionserver so the * for an aborted flush currently is a restart of the regionserver so the
* snapshot content dropped by the failure gets restored to the memstore.v * snapshot content dropped by the failure gets restored to the memstore.
* @param encodedRegionName Encoded region name. * @param encodedRegionName Encoded region name.
*/ */
void abortCacheFlush(byte[] encodedRegionName); void abortCacheFlush(byte[] encodedRegionName);
@ -174,19 +182,22 @@ public interface WAL {
*/ */
WALCoprocessorHost getCoprocessorHost(); WALCoprocessorHost getCoprocessorHost();
/**
/** Gets the earliest sequence number in the memstore for this particular region. * Gets the earliest unflushed sequence id in the memstore for the region.
* This can serve as best-effort "recent" WAL number for this region.
* @param encodedRegionName The region to get the number for. * @param encodedRegionName The region to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent. * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
* @deprecated Since version 1.2.0. Removing because not used and exposes subtle internal
* workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])}
*/ */
@VisibleForTesting
@Deprecated
long getEarliestMemstoreSeqNum(byte[] encodedRegionName); long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
/** /**
* Gets the earliest sequence number in the memstore for this particular region and store. * Gets the earliest unflushed sequence id in the memstore for the store.
* @param encodedRegionName The region to get the number for. * @param encodedRegionName The region to get the number for.
* @param familyName The family to get the number for. * @param familyName The family to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent. * @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
*/ */
long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName); long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);

View File

@ -123,6 +123,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
/** /**
* This class is responsible for splitting up a bunch of regionserver commit log * This class is responsible for splitting up a bunch of regionserver commit log
@ -315,15 +316,19 @@ public class WALSplitter {
failedServerName = (serverName == null) ? "" : serverName.getServerName(); failedServerName = (serverName == null) ? "" : serverName.getServerName();
while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName(); byte[] region = entry.getKey().getEncodedRegionName();
String key = Bytes.toString(region); String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(key); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) { if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) { if (this.distributedLogReplay) {
RegionStoreSequenceIds ids = RegionStoreSequenceIds ids =
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
key); encodedRegionNameAsStr);
if (ids != null) { if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId(); lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
TextFormat.shortDebugString(ids));
}
} }
} else if (sequenceIdChecker != null) { } else if (sequenceIdChecker != null) {
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
@ -332,13 +337,17 @@ public class WALSplitter {
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
storeSeqId.getSequenceId()); storeSeqId.getSequenceId());
} }
regionMaxSeqIdInStores.put(key, maxSeqIdInStores); regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
lastFlushedSequenceId = ids.getLastFlushedSequenceId(); lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
TextFormat.shortDebugString(ids));
}
} }
if (lastFlushedSequenceId == null) { if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = -1L; lastFlushedSequenceId = -1L;
} }
lastFlushedSequenceIds.put(key, lastFlushedSequenceId); lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
} }
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
editsSkipped++; editsSkipped++;
@ -1063,7 +1072,7 @@ public class WALSplitter {
} }
private void doRun() throws IOException { private void doRun() throws IOException {
LOG.debug("Writer thread " + this + ": starting"); if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
while (true) { while (true) {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) { if (buffer == null) {
@ -1218,7 +1227,8 @@ public class WALSplitter {
} }
} }
controller.checkForErrors(); controller.checkForErrors();
LOG.info("Split writers finished"); LOG.info((this.writerThreads == null? 0: this.writerThreads.size()) +
" split writers finished; closing...");
return (!progress_failed); return (!progress_failed);
} }
@ -1309,12 +1319,14 @@ public class WALSplitter {
CompletionService<Void> completionService = CompletionService<Void> completionService =
new ExecutorCompletionService<Void>(closeThreadPool); new ExecutorCompletionService<Void>(closeThreadPool);
for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) { for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); if (LOG.isTraceEnabled()) {
LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
}
completionService.submit(new Callable<Void>() { completionService.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
LOG.debug("Closing " + wap.p); if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
try { try {
wap.w.close(); wap.w.close();
} catch (IOException ioe) { } catch (IOException ioe) {
@ -1322,8 +1334,8 @@ public class WALSplitter {
thrown.add(ioe); thrown.add(ioe);
return null; return null;
} }
LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " LOG.info("Closed " + wap.p + "; wrote " + wap.editsWritten + " edit(s) in "
+ (wap.nanosSpent / 1000 / 1000) + "ms)"); + (wap.nanosSpent / 1000 / 1000) + "ms");
if (wap.editsWritten == 0) { if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file // just remove the empty recovered.edits file
@ -1482,8 +1494,8 @@ public class WALSplitter {
} }
} }
Writer w = createWriter(regionedits); Writer w = createWriter(regionedits);
LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); LOG.debug("Creating writer path=" + regionedits);
return (new WriterAndPath(regionedits, w)); return new WriterAndPath(regionedits, w);
} }
private void filterCellByStore(Entry logEntry) { private void filterCellByStore(Entry logEntry) {
@ -1497,6 +1509,7 @@ public class WALSplitter {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
byte[] family = CellUtil.cloneFamily(cell); byte[] family = CellUtil.cloneFamily(cell);
Long maxSeqId = maxSeqIdInStores.get(family); Long maxSeqId = maxSeqIdInStores.get(family);
LOG.info("CHANGE REMOVE " + Bytes.toString(family) + ", max=" + maxSeqId);
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
// or the master was crashed before and we can not get the information. // or the master was crashed before and we can not get the information.
if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) { if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) {
@ -1536,10 +1549,10 @@ public class WALSplitter {
filterCellByStore(logEntry); filterCellByStore(logEntry);
if (!logEntry.getEdit().isEmpty()) { if (!logEntry.getEdit().isEmpty()) {
wap.w.append(logEntry); wap.w.append(logEntry);
}
this.updateRegionMaximumEditLogSeqNum(logEntry); this.updateRegionMaximumEditLogSeqNum(logEntry);
editsCount++; editsCount++;
} }
}
// Pass along summary statistics // Pass along summary statistics
wap.incrementEdits(editsCount); wap.incrementEdits(editsCount);
wap.incrementNanoTime(System.nanoTime() - startTime); wap.incrementNanoTime(System.nanoTime() - startTime);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.security.UserProvider;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;

View File

@ -91,6 +91,7 @@ public class TestGetLastFlushedSequenceId {
testUtil.getHBaseCluster().getMaster() testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()); .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId()); assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId());
// This will be the sequenceid just before that of the earliest edit in memstore.
long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId(); long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId();
assertTrue(storeSequenceId > 0); assertTrue(storeSequenceId > 0);
testUtil.getHBaseAdmin().flush(tableName); testUtil.getHBaseAdmin().flush(tableName);

View File

@ -230,6 +230,35 @@ public class TestHRegion {
return name.getMethodName(); return name.getMethodName();
} }
/**
* Test that I can use the max flushed sequence id after the close.
* @throws IOException
*/
@Test (timeout = 100000)
public void testSequenceId() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
// Weird. This returns 0 if no store files or no edits. Afraid to change it.
assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
region.close();
assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
// Open region again.
region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
byte [] value = Bytes.toBytes(name.getMethodName());
// Make a random put against our cf.
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, null, value);
region.put(put);
// No flush yet so init numbers should still be in place.
assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
region.flush(true);
long max = region.getMaxFlushedSeqId();
region.close();
assertEquals(max, region.getMaxFlushedSeqId());
}
/** /**
* Test for Bug 2 of HBASE-10466. * Test for Bug 2 of HBASE-10466.
* "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize * "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mortbay.log.Log;
/**
* Testcase for https://issues.apache.org/jira/browse/HBASE-13811
*/
@Category({ MediumTests.class })
public class TestSplitWalDataLoss {
private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
.build();
private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
private byte[] family = Bytes.toBytes("f");
private byte[] qualifier = Bytes.toBytes("q");
@Before
public void setUp() throws Exception {
testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
testUtil.startMiniCluster(2);
HBaseAdmin admin = testUtil.getHBaseAdmin();
admin.createNamespace(namespace);
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
testUtil.waitTableAvailable(tableName);
}
@After
public void tearDown() throws Exception {
testUtil.shutdownMiniCluster();
}
@Test
public void test() throws IOException, InterruptedException {
final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
HRegion spiedRegion = spy(region);
final MutableBoolean flushed = new MutableBoolean(false);
final MutableBoolean reported = new MutableBoolean(false);
doAnswer(new Answer<FlushResult>() {
@Override
public FlushResult answer(InvocationOnMock invocation) throws Throwable {
synchronized (flushed) {
flushed.setValue(true);
flushed.notifyAll();
}
synchronized (reported) {
while (!reported.booleanValue()) {
reported.wait();
}
}
rs.getWAL(region.getRegionInfo()).abortCacheFlush(
region.getRegionInfo().getEncodedNameAsBytes());
throw new DroppedSnapshotException("testcase");
}
}).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
Matchers.<Collection<Store>> any());
rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
Connection conn = testUtil.getConnection();
try (Table table = conn.getTable(tableName)) {
table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
}
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
rs.cacheFlusher.requestFlush(spiedRegion, false);
synchronized (flushed) {
while (!flushed.booleanValue()) {
flushed.wait();
}
}
try (Table table = conn.getTable(tableName)) {
table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
}
long now = EnvironmentEdgeManager.currentTime();
rs.tryRegionServerReport(now - 500, now);
synchronized (reported) {
reported.setValue(true);
reported.notifyAll();
}
while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
Thread.sleep(100);
}
try (Table table = conn.getTable(tableName)) {
Result result = table.get(new Get(Bytes.toBytes("row0")));
assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -27,9 +26,7 @@ import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -322,53 +319,6 @@ public class TestFSHLog {
} }
} }
/**
* Simulates WAL append ops for a region and tests
* {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
* It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
* If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
* region should be flushed before archiving this WAL.
*/
@Test
public void testAllRegionsFlushed() {
LOG.debug("testAllRegionsFlushed");
Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
// create a table
TableName t1 = TableName.valueOf("t1");
// create a region
HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// variables to mock region sequenceIds
final AtomicLong sequenceId1 = new AtomicLong(1);
// test empty map
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// add entries in the region
seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
// should say region1 is not flushed.
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// test with entries in oldestFlushing map.
oldestUnFlushedSeqNo.clear();
oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
oldestFlushingSeqNo.clear();
oldestUnFlushedSeqNo.clear();
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// insert some large values for region1
oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// tests when oldestUnFlushed/oldestFlushing contains larger value.
// It means region is flushed.
oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
oldestUnFlushedSeqNo.clear();
seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
}
@Test(expected=IOException.class) @Test(expected=IOException.class)
public void testFailedToCreateWALIfParentRenamed() throws IOException { public void testFailedToCreateWALIfParentRenamed() throws IOException {
final String name = "testFailedToCreateWALIfParentRenamed"; final String name = "testFailedToCreateWALIfParentRenamed";

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestSequenceIdAccounting {
private static final byte [] ENCODED_REGION_NAME = Bytes.toBytes("r");
private static final byte [] FAMILY_NAME = Bytes.toBytes("cf");
private static final Set<byte[]> FAMILIES;
static {
FAMILIES = new HashSet<byte[]>();
FAMILIES.add(FAMILY_NAME);
}
@Test
public void testStartCacheFlush() {
SequenceIdAccounting sida = new SequenceIdAccounting();
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
Map<byte[], Long> m = new HashMap<byte[], Long>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
// Only one family so should return NO_SEQNUM still.
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME);
long currentSequenceId = sequenceid;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
final Set<byte[]> otherFamily = new HashSet<byte[]>(1);
otherFamily.add(Bytes.toBytes("otherCf"));
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
// Should return oldest sequence id in the region.
assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
sida.completeCacheFlush(ENCODED_REGION_NAME);
}
@Test
public void testAreAllLower() {
SequenceIdAccounting sida = new SequenceIdAccounting();
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
Map<byte[], Long> m = new HashMap<byte[], Long>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertTrue(sida.areAllLower(m));
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
assertTrue(sida.areAllLower(m));
m.put(ENCODED_REGION_NAME, sequenceid);
assertFalse(sida.areAllLower(m));
long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
assertEquals("Lowest should be first sequence id inserted", 1, lowest);
m.put(ENCODED_REGION_NAME, lowest);
assertFalse(sida.areAllLower(m));
// Now make sure above works when flushing.
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
assertFalse(sida.areAllLower(m));
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertTrue(sida.areAllLower(m));
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
sida.completeCacheFlush(ENCODED_REGION_NAME);
m.put(ENCODED_REGION_NAME, sequenceid);
assertTrue(sida.areAllLower(m));
// Flush again but add sequenceids while we are flushing.
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
m.put(ENCODED_REGION_NAME, lowest);
assertFalse(sida.areAllLower(m));
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
// The cache flush will clear out all sequenceid accounting by region.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
sida.completeCacheFlush(ENCODED_REGION_NAME);
// No new edits have gone in so no sequenceid to work with.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
// Make an edit behind all we'll put now into sida.
m.put(ENCODED_REGION_NAME, sequenceid);
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
assertTrue(sida.areAllLower(m));
}
@Test
public void testFindLower() {
SequenceIdAccounting sida = new SequenceIdAccounting();
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
Map<byte[], Long> m = new HashMap<byte[], Long>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
assertTrue(sida.findLower(m) == null);
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
assertTrue(sida.findLower(m).length == 1);
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
assertTrue(sida.findLower(m) == null);
}
}