HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS

This commit is contained in:
stack 2015-06-08 10:39:08 -07:00
parent d34e9c5c5c
commit 2baf3bfc9f
19 changed files with 912 additions and 487 deletions

View File

@ -453,7 +453,7 @@ public class RegionStates {
ServerName oldServerName = regionAssignments.put(hri, serverName);
if (!serverName.equals(oldServerName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName + " " + hri);
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
} else {
LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
}
@ -644,7 +644,7 @@ public class RegionStates {
// Region is open on this region server, but in transition.
// This region must be moving away from this server, or splitting/merging.
// SSH will handle it, either skip assigning, or re-assign.
LOG.info("Transitioning " + state + " will be handled by SSH for " + sn);
LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn);
} else if (sn.equals(state.getServerName())) {
// Region is in transition on this region server, and this
// region is not open on this server. So the region must be
@ -654,7 +654,8 @@ public class RegionStates {
// transition. The region could be in failed_close state too if we have
// tried several times to open it while this region server is not reachable)
if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) {
LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn);
LOG.info("Found region in " + state +
" to be reassigned by ServerCrashProcedure for " + sn);
rits.add(hri);
} else if(state.isSplittingNew()) {
regionsToCleanIfNoMetaEntry.add(state.getRegion());

View File

@ -122,9 +122,15 @@ public class ServerManager {
// Set if we are to shutdown the cluster.
private volatile boolean clusterShutdown = false;
/**
* The last flushed sequence id for a region.
*/
private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
/**
* The last flushed sequence id for a store in a region.
*/
private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
storeFlushedSequenceIdsByRegion =
new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
@ -293,6 +299,10 @@ public class ServerManager {
Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
long l = entry.getValue().getCompleteSequenceId();
// Don't let smaller sequence ids override greater sequence ids.
if (LOG.isTraceEnabled()) {
LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
", completeSequenceId=" + l);
}
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
flushedSequenceIdByRegion.put(encodedRegionName, l);
} else if (l != HConstants.NO_SEQNUM && l < existingValue) {
@ -306,6 +316,10 @@ public class ServerManager {
byte[] family = storeSeqId.getFamilyName().toByteArray();
existingValue = storeFlushedSequenceId.get(family);
l = storeSeqId.getSequenceId();
if (LOG.isTraceEnabled()) {
LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
", existingValue=" + existingValue + ", completeSequenceId=" + l);
}
// Don't let smaller sequence ids override greater sequence ids.
if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
storeFlushedSequenceId.put(family, l);

View File

@ -76,9 +76,9 @@ public class FlushLargeStoresPolicy extends FlushPolicy {
private boolean shouldFlush(Store store) {
if (store.getMemStoreSize() > this.flushSizeLowerBound) {
if (LOG.isDebugEnabled()) {
LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region
+ " will be flushed because of memstoreSize(" + store.getMemStoreSize()
+ ") is larger than lower bound(" + this.flushSizeLowerBound + ")");
LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +
region.getRegionInfo().getEncodedName() + " because memstoreSize=" +
store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound);
}
return true;
}

View File

@ -22,7 +22,6 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.text.ParseException;
import java.util.AbstractList;
@ -217,13 +216,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final AtomicBoolean closing = new AtomicBoolean(false);
/**
* The max sequence id of flushed data on this region. Used doing some rough calculations on
* whether time to flush or not.
* The max sequence id of flushed data on this region. There is no edit in memory that is
* less that this sequence id.
*/
private volatile long maxFlushedSeqId = HConstants.NO_SEQNUM;
/**
* Record the sequence id of last flush operation.
* Record the sequence id of last flush operation. Can be in advance of
* {@link #maxFlushedSeqId} when flushing a single column family. In this case,
* {@link #maxFlushedSeqId} will be older than the oldest edit in memory.
*/
private volatile long lastFlushOpSeqId = HConstants.NO_SEQNUM;
/**
@ -608,6 +609,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* is new), then read them from the supplied path.
* @param htd the table descriptor
* @param rsServices reference to {@link RegionServerServices} or null
* @deprecated Use other constructors.
*/
@Deprecated
public HRegion(final Path tableDir, final WAL wal, final FileSystem fs,
@ -1610,16 +1612,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
byte[] encodedRegionName = this.getRegionInfo().getEncodedNameAsBytes();
regionLoadBldr.clearStoreCompleteSequenceId();
for (byte[] familyName : this.stores.keySet()) {
long oldestUnflushedSeqId = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
// no oldestUnflushedSeqId means no data has written to the store after last flush, so we use
// lastFlushOpSeqId as complete sequence id for the store.
regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId
.newBuilder()
.setFamilyName(ByteString.copyFrom(familyName))
.setSequenceId(
oldestUnflushedSeqId < 0 ? lastFlushOpSeqIdLocal : oldestUnflushedSeqId - 1).build());
long earliest = this.wal.getEarliestMemstoreSeqNum(encodedRegionName, familyName);
// Subtract - 1 to go earlier than the current oldest, unflushed edit in memstore; this will
// give us a sequence id that is for sure flushed. We want edit replay to start after this
// sequence id in this region. If NO_SEQNUM, use the regions maximum flush id.
long csid = (earliest == HConstants.NO_SEQNUM)? lastFlushOpSeqIdLocal: earliest - 1;
regionLoadBldr.addStoreCompleteSequenceId(StoreSequenceId.
newBuilder().setFamilyName(ByteString.copyFrom(familyName)).setSequenceId(csid).build());
}
return regionLoadBldr.setCompleteSequenceId(this.maxFlushedSeqId);
return regionLoadBldr.setCompleteSequenceId(getMaxFlushedSeqId());
}
//////////////////////////////////////////////////////////////////////////////
@ -1912,27 +1913,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* returns true which will make a lot of flush requests.
*/
boolean shouldFlushStore(Store store) {
long maxFlushedSeqId =
this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store
.getFamily().getName()) - 1;
if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) {
long earliest = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(),
store.getFamily().getName()) - 1;
if (earliest > 0 && earliest + flushPerChanges < sequenceId.get()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+ " will be flushed because its max flushed seqId(" + maxFlushedSeqId
+ ") is far away from current(" + sequenceId.get() + "), max allowed is "
+ flushPerChanges);
LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " +
getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + earliest +
" is > " + this.flushPerChanges + " from current=" + sequenceId.get());
}
return true;
}
if (flushCheckInterval <= 0) {
if (this.flushCheckInterval <= 0) {
return false;
}
long now = EnvironmentEdgeManager.currentTime();
if (store.timeOfOldestEdit() < now - flushCheckInterval) {
if (store.timeOfOldestEdit() < now - this.flushCheckInterval) {
if (LOG.isDebugEnabled()) {
LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this
+ " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit()
+ ") is far away from now(" + now + "), max allowed is " + flushCheckInterval);
LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " +
getRegionInfo().getEncodedName() + " because time of oldest edit=" +
store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now);
}
return true;
}
@ -2086,18 +2085,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
if (LOG.isInfoEnabled()) {
LOG.info("Started memstore flush for " + this + ", current region memstore size "
+ StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/"
+ stores.size() + " column families' memstores are being flushed."
+ ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid));
// only log when we are not flushing all stores.
if (this.stores.size() > storesToFlush.size()) {
// Log a fat line detailing what is being flushed.
StringBuilder perCfExtras = null;
if (!isAllFamilies(storesToFlush)) {
perCfExtras = new StringBuilder();
for (Store store: storesToFlush) {
LOG.info("Flushing Column Family: " + store.getColumnFamilyName()
+ " which was occupying "
+ StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore.");
perCfExtras.append("; ");
perCfExtras.append(store.getColumnFamilyName());
perCfExtras.append("=");
perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize()));
}
}
LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() +
" column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid));
}
// Stop updates while we snapshot the memstore of all of these regions' stores. We only have
// to do this for a moment. It is quick. We also set the memstore size to zero here before we
@ -2123,10 +2125,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
Bytes.BYTES_COMPARATOR);
// The sequence id of this flush operation which is used to log FlushMarker and pass to
// createFlushContext to use as the store file's sequence id.
// createFlushContext to use as the store file's sequence id. It can be in advance of edits
// still in the memstore, edits that are in other column families yet to be flushed.
long flushOpSeqId = HConstants.NO_SEQNUM;
// The max flushed sequence id after this flush operation. Used as completeSequenceId which is
// passed to HMaster.
// The max flushed sequence id after this flush operation completes. All edits in memstore
// will be in advance of this sequence id.
long flushedSeqId = HConstants.NO_SEQNUM;
byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes();
@ -2135,21 +2138,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
w = mvcc.beginMemstoreInsert();
if (wal != null) {
if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) {
// This should never happen.
String msg = "Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
Long earliestUnflushedSequenceIdForTheRegion =
wal.startCacheFlush(encodedRegionName, flushedFamilyNames);
if (earliestUnflushedSequenceIdForTheRegion == null) {
// This should never happen. This is how startCacheFlush signals flush cannot proceed.
String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing.";
status.setStatus(msg);
return new PrepareFlushResult(
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false),
myseqid);
}
flushOpSeqId = getNextSequenceId(wal);
long oldestUnflushedSeqId = wal.getEarliestMemstoreSeqNum(encodedRegionName);
// no oldestUnflushedSeqId means we flushed all stores.
// or the unflushed stores are all empty.
flushedSeqId = (oldestUnflushedSeqId == HConstants.NO_SEQNUM) ? flushOpSeqId
: oldestUnflushedSeqId - 1;
// Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit
flushedSeqId =
earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM?
flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1;
} else {
// use the provided sequence Id as WAL is not being used for this flush.
flushedSeqId = flushOpSeqId = myseqid;
@ -2229,6 +2232,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
flushedSeqId, totalFlushableSizeOfFlushableStores);
}
/**
* @param families
* @return True if passed Set is all families in the region.
*/
private boolean isAllFamilies(final Collection<Store> families) {
return families == null || this.stores.size() == families.size();
}
/**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
@ -2344,10 +2355,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.lastStoreFlushTimeMap.put(store, startTime);
}
// Update the oldest unflushed sequence id for region.
this.maxFlushedSeqId = flushedSeqId;
// Record flush operation sequence id.
this.lastFlushOpSeqId = flushOpSeqId;
// C. Finally notify anyone waiting on memstore to clear:
@ -3704,7 +3712,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this);
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
}
@ -4438,7 +4446,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
+ seqId + " is greater than current seqId:" + currentSeqId);
// Prepare flush (take a snapshot) and then abort (drop the snapshot)
if (store == null ) {
if (store == null) {
for (Store s : stores.values()) {
totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId);
}
@ -5432,11 +5440,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
} else if (scannerContext.checkSizeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
} else if (scannerContext.checkTimeLimit(limitScope)) {
ScannerContext.NextState state =
moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED;
return scannerContext.setScannerState(state).hasMoreValues();
}
} while (moreCellsInRow);
@ -5815,7 +5823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.error(msg);
LOG.error("unable to refresh store files", e);
abortRegionServer(msg);
return new NotServingRegionException(getRegionInfo().getRegionNameAsString() +" is closing");
return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing");
}
}
@ -5968,11 +5976,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return new HRegion
* @throws IOException
*/
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, final Path tableDir,
final Configuration conf,
final HTableDescriptor hTableDescriptor,
final WAL wal,
final boolean initialize, final boolean ignoreWAL)
public static HRegion createHRegion(final HRegionInfo info, final Path rootDir,
final Path tableDir, final Configuration conf, final HTableDescriptor hTableDescriptor,
final WAL wal, final boolean initialize, final boolean ignoreWAL)
throws IOException {
LOG.info("creating HRegion " + info.getTable().getNameAsString()
+ " HTD == " + hTableDescriptor + " RootDir = " + rootDir +
@ -6338,6 +6344,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param tabledir qualified path for table
* @param name ENCODED region name
* @return Path of HRegion directory
* @deprecated For tests only; to be removed.
*/
@Deprecated
public static Path getRegionDir(final Path tabledir, final String name) {
@ -6350,6 +6357,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param rootdir qualified path of HBase root directory
* @param info HRegionInfo for the region
* @return qualified path of region directory
* @deprecated For tests only; to be removed.
*/
@Deprecated
@VisibleForTesting
@ -7170,7 +7178,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
newTags.add(itr.next());
}
}
if (i < ( edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1)))
idx++;
}
@ -7752,6 +7760,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// sync the WAL edit (SYNC and FSYNC treated the same for now)
this.wal.sync(txid);
break;
default:
throw new RuntimeException("Unknown durability " + durability);
}
}
}
@ -7840,8 +7850,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public long getOldestSeqIdOfStore(byte[] familyName) {
return wal.getEarliestMemstoreSeqNum(getRegionInfo()
.getEncodedNameAsBytes(), familyName);
return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName);
}
@Override

View File

@ -2835,24 +2835,14 @@ public class HRegionServer extends HasThread implements
@Override
public boolean removeFromOnlineRegions(final Region r, ServerName destination) {
Region toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
if (destination != null) {
try {
WAL wal = getWAL(r.getRegionInfo());
long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
long closeSeqNum = r.getMaxFlushedSeqId();
if (closeSeqNum == HConstants.NO_SEQNUM) {
// No edits in WAL for this region; get the sequence number when the region was opened.
closeSeqNum = r.getOpenSeqNum();
if (closeSeqNum == HConstants.NO_SEQNUM) {
closeSeqNum = 0;
}
if (closeSeqNum == HConstants.NO_SEQNUM) closeSeqNum = 0;
}
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
} catch (IOException exception) {
LOG.error("Could not retrieve WAL information for region " + r.getRegionInfo() +
"; not adding to moved regions.");
LOG.debug("Exception details for failure to get wal", exception);
}
}
this.regionFavoredNodesMap.remove(r.getRegionInfo().getEncodedName());
return toReturn != null;

View File

@ -111,16 +111,6 @@ import com.google.common.collect.Sets;
* services is compaction services where files are aggregated once they pass
* a configurable threshold.
*
* <p>The only thing having to do with logs that Store needs to deal with is
* the reconstructionLog. This is a segment of an HRegion's log that might
* NOT be present upon startup. If the param is NULL, there's nothing to do.
* If the param is non-NULL, we need to process the log to reconstruct
* a TreeMap that might not have been written to disk before the process
* died.
*
* <p>It's assumed that after this constructor returns, the reconstructionLog
* file will be deleted (by whoever has instantiated the Store).
*
* <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager.
*/
@ -898,8 +888,7 @@ public class HStore implements Store {
}
/**
* Write out current snapshot. Presumes {@link #snapshot()} has been called
* previously.
* Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
* @param logCacheFlushId flush sequence number
* @param snapshot
* @param status

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
@ -123,10 +124,19 @@ public interface Region extends ConfigurationObserver {
/** @return the latest sequence number that was read from storage when this region was opened */
long getOpenSeqNum();
/** @return the max sequence id of flushed data on this region */
/** @return the max sequence id of flushed data on this region; no edit in memory will have
* a sequence id that is less that what is returned here.
*/
long getMaxFlushedSeqId();
/** @return the oldest sequence id found in the store for the given family */
/** @return the oldest flushed sequence id for the given family; can be beyond
* {@link #getMaxFlushedSeqId()} in case where we've flushed a subset of a regions column
* families
* @deprecated Since version 1.2.0. Exposes too much about our internals; shutting it down.
* Do not use.
*/
@VisibleForTesting
@Deprecated
public long getOldestSeqIdOfStore(byte[] familyName);
/**

View File

@ -29,16 +29,13 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -66,7 +63,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
@ -91,7 +87,6 @@ import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
@ -168,7 +163,7 @@ public class FSHLog implements WAL {
private final Disruptor<RingBufferTruck> disruptor;
/**
* An executorservice that runs the disrutpor AppendEventHandler append executor.
* An executorservice that runs the disruptor AppendEventHandler append executor.
*/
private final ExecutorService appendExecutor;
@ -210,6 +205,7 @@ public class FSHLog implements WAL {
* WAL directory, where all WAL files would be placed.
*/
private final Path fullPathLogDir;
/**
* dir path where old logs are kept.
*/
@ -241,6 +237,7 @@ public class FSHLog implements WAL {
* conf object
*/
protected final Configuration conf;
/** Listeners that are called on WAL events. */
private final List<WALActionsListener> listeners = new CopyOnWriteArrayList<WALActionsListener>();
@ -258,6 +255,7 @@ public class FSHLog implements WAL {
public WALCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}
/**
* FSDataOutputStream associated with the current SequenceFile.writer
*/
@ -288,6 +286,13 @@ public class FSHLog implements WAL {
// Enable it if the replications recover.
private volatile boolean lowReplicationRollEnabled = true;
/**
* Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding
* sequence id as yet not flushed as well as the most recent edit sequence id appended to the
* WAL. Has facility for answering questions such as "Is it safe to GC a WAL?".
*/
private SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting();
/**
* Current log file.
*/
@ -334,52 +339,6 @@ public class FSHLog implements WAL {
private final AtomicInteger closeErrorCount = new AtomicInteger();
// Region sequence id accounting across flushes and for knowing when we can GC a WAL. These
// sequence id numbers are by region and unrelated to the ring buffer sequence number accounting
// done above in failedSequence, highest sequence, etc.
/**
* This lock ties all operations on lowestFlushingStoreSequenceIds and
* oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into
* oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions
* sequence id, or to find regions with old sequence ids to force flush; we are interested in
* old stuff not the new additions (TODO: IS THIS SAFE? CHECK!).
*/
private final Object regionSequenceIdLock = new Object();
/**
* Map of encoded region names and family names to their OLDEST -- i.e. their first,
* the longest-lived -- sequence id in memstore. Note that this sequence id is the region
* sequence id. This is not related to the id we use above for {@link #highestSyncedSequence}
* and {@link #highestUnsyncedSequence} which is the sequence from the disruptor
* ring buffer.
*/
private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> oldestUnflushedStoreSequenceIds
= new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
Bytes.BYTES_COMPARATOR);
/**
* Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in
* memstore currently being flushed out to hfiles. Entries are moved here from
* {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held
* (so movement between the Maps is atomic). This is not related to the id we use above for
* {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from
* the disruptor ring buffer, an internal detail.
*/
private final Map<byte[], Map<byte[], Long>> lowestFlushingStoreSequenceIds =
new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
/**
* Map of region encoded names to the latest region sequence id. Updated on each append of
* WALEdits to the WAL. We create one map for each WAL file at the time it is rolled.
* <p>When deciding whether to archive a WAL file, we compare the sequence IDs in this map to
* {@link #lowestFlushingRegionSequenceIds} and {@link #oldestUnflushedRegionSequenceIds}.
* See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info.
* <p>
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
* the same array.
*/
private Map<byte[], Long> highestRegionSequenceIds = new HashMap<byte[], Long>();
/**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
@ -396,7 +355,7 @@ public class FSHLog implements WAL {
};
/**
* Map of wal log file to the latest sequence ids of all regions it has entries of.
* Map of WAL log file to the latest sequence ids of all regions it has entries of.
* The map is sorted by the log file creation timestamp (contained in the log file name).
*/
private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
@ -542,7 +501,7 @@ public class FSHLog implements WAL {
(long)(blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f));
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
this.minTolerableReplication = conf.getInt( "hbase.regionserver.hlog.tolerable.lowreplication",
this.minTolerableReplication = conf.getInt("hbase.regionserver.hlog.tolerable.lowreplication",
FSUtils.getDefaultReplication(fs, this.fullPathLogDir));
this.lowReplicationRollLimit =
conf.getInt("hbase.regionserver.hlog.lowreplication.rolllimit", 5);
@ -745,128 +704,37 @@ public class FSHLog implements WAL {
return DefaultWALProvider.createWriter(conf, fs, path, false);
}
private long getLowestSeqId(Map<byte[], Long> seqIdMap) {
long result = HConstants.NO_SEQNUM;
for (Long seqNum: seqIdMap.values()) {
if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) {
result = seqNum.longValue();
}
}
return result;
}
private <T extends Map<byte[], Long>> Map<byte[], Long> copyMapWithLowestSeqId(
Map<byte[], T> mapToCopy) {
Map<byte[], Long> copied = Maps.newHashMap();
for (Map.Entry<byte[], T> entry: mapToCopy.entrySet()) {
long lowestSeqId = getLowestSeqId(entry.getValue());
if (lowestSeqId != HConstants.NO_SEQNUM) {
copied.put(entry.getKey(), lowestSeqId);
}
}
return copied;
}
/**
* Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
* have been flushed to hfiles.
* <p>
* For each log file, it compares its region to sequenceId map
* (@link {@link FSHLog#highestRegionSequenceIds} with corresponding region entries in
* {@link FSHLog#lowestFlushingRegionSequenceIds} and
* {@link FSHLog#oldestUnflushedRegionSequenceIds}. If all the regions in the map are flushed
* past of their value, then the wal is eligible for archiving.
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
* @throws IOException
*/
private void cleanOldLogs() throws IOException {
Map<byte[], Long> lowestFlushingRegionSequenceIdsLocal = null;
Map<byte[], Long> oldestUnflushedRegionSequenceIdsLocal = null;
List<Path> logsToArchive = new ArrayList<Path>();
// make a local copy so as to avoid locking when we iterate over these maps.
synchronized (regionSequenceIdLock) {
lowestFlushingRegionSequenceIdsLocal =
copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds);
oldestUnflushedRegionSequenceIdsLocal =
copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds);
}
for (Map.Entry<Path, Map<byte[], Long>> e : byWalRegionSequenceIds.entrySet()) {
// iterate over the log file.
List<Path> logsToArchive = null;
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
// are older than what is currently in memory, the WAL can be GC'd.
for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
Path log = e.getKey();
Map<byte[], Long> sequenceNums = e.getValue();
// iterate over the map for this log file, and tell whether it should be archive or not.
if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal,
oldestUnflushedRegionSequenceIdsLocal)) {
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
if (logsToArchive == null) logsToArchive = new ArrayList<Path>();
logsToArchive.add(log);
LOG.debug("WAL file ready for archiving " + log);
if (LOG.isTraceEnabled()) LOG.trace("WAL file ready for archiving " + log);
}
}
if (logsToArchive != null) {
for (Path p : logsToArchive) {
this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
archiveLogFile(p);
this.byWalRegionSequenceIds.remove(p);
}
}
/**
* Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived.
* It compares the region entries present in the passed sequenceNums map with the local copy of
* {@link #oldestUnflushedRegionSequenceIds} and {@link #lowestFlushingRegionSequenceIds}. If,
* for all regions, the value is lesser than the minimum of values present in the
* oldestFlushing/UnflushedSeqNums, then the wal file is eligible for archiving.
* @param sequenceNums for a WAL, at the time when it was rolled.
* @param oldestFlushingMap
* @param oldestUnflushedMap
* @return true if wal is eligible for archiving, false otherwise.
*/
static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
// find region entries in the flushing/unflushed map. If there is no entry, it meansj
// a region doesn't have any unflushed entry.
long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
// do a minimum to be sure to contain oldest sequence Id
long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive
}
return true;
}
/**
* Iterates over the given map of regions, and compares their sequence numbers with corresponding
* entries in {@link #oldestUnflushedRegionSequenceIds}. If the sequence number is greater or
* equal, the region is eligible to flush, otherwise, there is no benefit to flush (from the
* perspective of passed regionsSequenceNums map), because the region has already flushed the
* entries present in the WAL file for which this method is called for (typically, the oldest
* wal file).
* @param regionsSequenceNums
* @return regions which should be flushed (whose sequence numbers are larger than their
* corresponding un-flushed entries.
*/
private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
List<byte[]> regionsToFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (regionSequenceIdLock) {
for (Map.Entry<byte[], Long> e: regionsSequenceNums.entrySet()) {
long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey());
if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) {
if (regionsToFlush == null)
regionsToFlush = new ArrayList<byte[]>();
regionsToFlush.add(e.getKey());
}
}
}
return regionsToFlush == null ? null : regionsToFlush
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
/**
* If the number of un-archived WAL files is greater than maximum allowed, it checks
* the first (oldest) WAL file, and returns the regions which should be flushed so that it could
* If the number of un-archived WAL files is greater than maximum allowed, check the first
* (oldest) WAL file, and returns those regions which should be flushed so that it can
* be archived.
* @return regions to flush in order to archive oldest wal file.
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
* @throws IOException
*/
byte[][] findRegionsToForceFlush() throws IOException {
@ -875,7 +743,7 @@ public class FSHLog implements WAL {
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
this.byWalRegionSequenceIds.firstEntry();
regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
}
if (regions != null) {
StringBuilder sb = new StringBuilder();
@ -883,9 +751,8 @@ public class FSHLog implements WAL {
if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many wals: logs=" + logCount + ", maxlogs=" +
this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
sb.toString());
LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs +
"; forcing flush of " + regions.length + " regions(s): " + sb.toString());
}
return regions;
}
@ -963,8 +830,7 @@ public class FSHLog implements WAL {
this.numEntries.set(0);
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
if (oldPath != null) {
this.byWalRegionSequenceIds.put(oldPath, this.highestRegionSequenceIds);
this.highestRegionSequenceIds = new HashMap<byte[], Long>();
this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
@ -1108,7 +974,7 @@ public class FSHLog implements WAL {
LOG.debug("Moved " + files.length + " WAL file(s) to " +
FSUtils.getPath(this.fullPathArchiveDir));
}
LOG.info("Closed WAL: " + toString() );
LOG.info("Closed WAL: " + toString());
}
@Override
@ -1631,108 +1497,24 @@ public class FSHLog implements WAL {
}
@Override
public boolean startCacheFlush(final byte[] encodedRegionName,
Set<byte[]> flushedFamilyNames) {
Map<byte[], Long> oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families) {
if (!closeBarrier.beginOp()) {
LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
" - because the server is closing.");
return false;
LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing.");
return null;
}
synchronized (regionSequenceIdLock) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
oldestUnflushedStoreSequenceIds.get(encodedRegionName);
if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
for (byte[] familyName: flushedFamilyNames) {
Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName);
if (seqId != null) {
oldStoreSeqNum.put(familyName, seqId);
}
}
if (!oldStoreSeqNum.isEmpty()) {
Map<byte[], Long> oldValue = this.lowestFlushingStoreSequenceIds.put(
encodedRegionName, oldStoreSeqNum);
assert oldValue == null: "Flushing map not cleaned up for "
+ Bytes.toString(encodedRegionName);
}
if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) {
// Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
// even if the region is already moved to other server.
// Do not worry about data racing, we held write lock of region when calling
// startCacheFlush, so no one can add value to the map we removed.
oldestUnflushedStoreSequenceIds.remove(encodedRegionName);
}
}
}
if (oldStoreSeqNum.isEmpty()) {
// TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
// the region is already flushing (which would make this call invalid), or there
// were no appends after last flush, so why are we starting flush? Maybe we should
// assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
// For now preserve old logic.
LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+ Bytes.toString(encodedRegionName) + "]");
}
return true;
return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families);
}
@Override
public void completeCacheFlush(final byte [] encodedRegionName) {
synchronized (regionSequenceIdLock) {
this.lowestFlushingStoreSequenceIds.remove(encodedRegionName);
}
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
closeBarrier.endOp();
}
private ConcurrentMap<byte[], Long> getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(
byte[] encodedRegionName) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
oldestUnflushedStoreSequenceIds.get(encodedRegionName);
if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
return oldestUnflushedStoreSequenceIdsOfRegion;
}
oldestUnflushedStoreSequenceIdsOfRegion =
new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
ConcurrentMap<byte[], Long> alreadyPut =
oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName,
oldestUnflushedStoreSequenceIdsOfRegion);
return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut;
}
@Override
public void abortCacheFlush(byte[] encodedRegionName) {
Map<byte[], Long> storeSeqNumsBeforeFlushStarts;
Map<byte[], Long> currentStoreSeqNums = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
synchronized (regionSequenceIdLock) {
storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove(
encodedRegionName);
if (storeSeqNumsBeforeFlushStarts != null) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
for (Map.Entry<byte[], Long> familyNameAndSeqId: storeSeqNumsBeforeFlushStarts
.entrySet()) {
currentStoreSeqNums.put(familyNameAndSeqId.getKey(),
oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(),
familyNameAndSeqId.getValue()));
}
}
}
this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
closeBarrier.endOp();
if (storeSeqNumsBeforeFlushStarts != null) {
for (Map.Entry<byte[], Long> familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) {
Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey());
if (currentSeqNum != null
&& currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) {
String errorStr =
"Region " + Bytes.toString(encodedRegionName) + " family "
+ Bytes.toString(familyNameAndSeqId.getKey())
+ " acquired edits out of order current memstore seq=" + currentSeqNum
+ ", previous oldest unflushed id=" + familyNameAndSeqId.getValue();
LOG.error(errorStr);
Runtime.getRuntime().halt(1);
}
}
}
}
@VisibleForTesting
@ -1762,23 +1544,21 @@ public class FSHLog implements WAL {
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
return oldestUnflushedStoreSequenceIdsOfRegion != null ?
getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM;
// Used by tests. Deprecated as too subtle for general usage.
return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
}
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName,
byte[] familyName) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
this.oldestUnflushedStoreSequenceIds.get(encodedRegionName);
if (oldestUnflushedStoreSequenceIdsOfRegion != null) {
Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName);
return result != null ? result.longValue() : HConstants.NO_SEQNUM;
} else {
return HConstants.NO_SEQNUM;
}
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
// This method is used by tests and for figuring if we should flush or not because our
// sequenceids are too old. It is also used reporting the master our oldest sequenceid for use
// figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId
// from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the
// currently flushing sequence ids, and if anything found there, it is returning these. This is
// the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if
// we crash during the flush. For figuring what to flush, we might get requeued if our sequence
// id is old even though we are currently flushing. This may mean we do too much flushing.
return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName);
}
/**
@ -1820,10 +1600,10 @@ public class FSHLog implements WAL {
/**
* For Thread A to call when it is ready to wait on the 'safe point' to be attained.
* Thread A will be held in here until Thread B calls {@link #safePointAttained()}
* @throws InterruptedException
* @throws ExecutionException
* @param syncFuture We need this as barometer on outstanding syncs. If it comes home with
* an exception, then something is up w/ our syncing.
* @throws InterruptedException
* @throws ExecutionException
* @return The passed <code>syncFuture</code>
* @throws FailedSyncBeforeLogCloseException
*/
@ -2014,15 +1794,6 @@ public class FSHLog implements WAL {
}
}
private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName,
Set<byte[]> familyNameSet, Long lRegionSequenceId) {
ConcurrentMap<byte[], Long> oldestUnflushedStoreSequenceIdsOfRegion =
getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName);
for (byte[] familyName : familyNameSet) {
oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId);
}
}
/**
* Append to the WAL. Does all CP and WAL listener calls.
* @param entry
@ -2067,13 +1838,8 @@ public class FSHLog implements WAL {
writer.append(entry);
assert highestUnsyncedSequence < entry.getSequence();
highestUnsyncedSequence = entry.getSequence();
Long lRegionSequenceId = Long.valueOf(regionSequenceId);
highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
if (entry.isInMemstore()) {
updateOldestUnflushedSequenceIds(encodedRegionName,
entry.getFamilyNames(), lRegionSequenceId);
}
sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId,
entry.isInMemstore());
coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
// Update metrics.
postAppend(entry, EnvironmentEdgeManager.currentTime() - start);

View File

@ -0,0 +1,363 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.collect.Maps;
/**
* Accounting of sequence ids per region and then by column family. So we can our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance
* can keep abreast of the state of sequence id persistence. Also call update per append.
*/
class SequenceIdAccounting {
private static final Log LOG = LogFactory.getLog(SequenceIdAccounting.class);
/**
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
* lowest outstanding sequence ids EXCEPT when flushing. When we flush, the current
* lowest set for the region/column family are moved (atomically because of this lock) to
* {@link #flushingSequenceIds}.
*
* <p>The two Maps are tied by this locking object EXCEPT when we go to update the lowest
* entry; see {@link #lowest(byte[], Set, Long)}. In here is a putIfAbsent call on
* {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest
* sequence id if we find that there is no entry for the current column family. There will be no
* entry only if we just came up OR we have moved aside current set of lowest sequence ids
* because the current set are being flushed (by putting them into {@link #flushingSequenceIds}).
* This is how we pick up the next 'lowest' sequence id per region per column family to be used
* figuring what is in the next flush.
*/
private final Object tieLock = new Object();
/**
* Map of encoded region names and family names to their OLDEST -- i.e. their first,
* the longest-lived, their 'earliest', the 'lowest' -- sequence id.
*
* <p>When we flush, the current lowest sequence ids get cleared and added to
* {@link #flushingSequenceIds}. The next append that comes in, is then added
* here to {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid.
*
* <p>If flush fails, currently server is aborted so no need to restore previous sequence ids.
* <p>Needs to be concurrent Maps because we use putIfAbsent updating oldest.
*/
private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> lowestUnflushedSequenceIds
= new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
Bytes.BYTES_COMPARATOR);
/**
* Map of encoded region names and family names to their lowest or OLDEST sequence/edit id
* currently being flushed out to hfiles. Entries are moved here from
* {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held
* (so movement between the Maps is atomic).
*/
private final Map<byte[], Map<byte[], Long>> flushingSequenceIds =
new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
/**
* Map of region encoded names to the latest/highest region sequence id. Updated on each
* call to append.
* <p>
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
* the same array.
*/
private Map<byte[], Long> highestSequenceIds = new HashMap<byte[], Long>();
/**
* Returns the lowest unflushed sequence id for the region.
* @param encodedRegionName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
* return {@link HConstants#NO_SEQNUM} when none.
*/
long getLowestSequenceId(final byte [] encodedRegionName) {
synchronized (this.tieLock) {
Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
long flushingLowest = m != null? getLowestSequenceId(m): Long.MAX_VALUE;
m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
long unflushedLowest = m != null? getLowestSequenceId(m): HConstants.NO_SEQNUM;
return Math.min(flushingLowest, unflushedLowest);
}
}
/**
* @param encodedRegionName
* @param familyName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
* <code>familyName</code>. Returned sequenceid may be for an edit currently being flushed.
*/
long getLowestSequenceId(final byte [] encodedRegionName, final byte [] familyName) {
synchronized (this.tieLock) {
Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
if (m != null) {
Long lowest = m.get(familyName);
if (lowest != null) return lowest;
}
m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) {
Long lowest = m.get(familyName);
if (lowest != null) return lowest;
}
}
return HConstants.NO_SEQNUM;
}
/**
* Reset the accounting of highest sequenceid by regionname.
* @return Return the previous accounting Map of regions to the last sequence id written into
* each.
*/
Map<byte[], Long> resetHighest() {
Map<byte[], Long> old = this.highestSequenceIds;
this.highestSequenceIds = new HashMap<byte[], Long>();
return old;
}
/**
* We've been passed a new sequenceid for the region. Set it as highest seen for this region and
* if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing
* currently older.
* @param encodedRegionName
* @param families
* @param sequenceid
* @param lowest Whether to keep running account of oldest sequence id.
*/
void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
final boolean lowest) {
Long l = Long.valueOf(sequenceid);
this.highestSequenceIds.put(encodedRegionName, l);
if (lowest) {
ConcurrentMap<byte[], Long> m = getOrCreateLowestSequenceIds(encodedRegionName);
for (byte[] familyName : families) {
m.putIfAbsent(familyName, l);
}
}
}
ConcurrentMap<byte[], Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {
// Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append.
ConcurrentMap<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) return m;
m = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
// Another thread may have added it ahead of us.
ConcurrentMap<byte[], Long> alreadyPut =
this.lowestUnflushedSequenceIds.putIfAbsent(encodedRegionName, m);
return alreadyPut == null? m : alreadyPut;
}
/**
* @param sequenceids Map to search for lowest value.
* @return Lowest value found in <code>sequenceids</code>.
*/
static long getLowestSequenceId(Map<byte[], Long> sequenceids) {
long lowest = HConstants.NO_SEQNUM;
for (Long sid: sequenceids.values()) {
if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) {
lowest = sid.longValue();
}
}
return lowest;
}
/**
* @param src
* @return New Map that has same keys as <code>src</code> but instead of a Map for a value, it
* instead has found the smallest sequence id and it returns that as the value instead.
*/
private <T extends Map<byte[], Long>> Map<byte[], Long> flattenToLowestSequenceId(
Map<byte[], T> src) {
if (src == null || src.isEmpty()) return null;
Map<byte[], Long> tgt = Maps.newHashMap();
for (Map.Entry<byte[], T> entry: src.entrySet()) {
long lowestSeqId = getLowestSequenceId(entry.getValue());
if (lowestSeqId != HConstants.NO_SEQNUM) {
tgt.put(entry.getKey(), lowestSeqId);
}
}
return tgt;
}
/**
* @param encodedRegionName Region to flush.
* @param families Families to flush. May be a subset of all families in the region.
* @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
* we are flushing a subset of all families but there are no edits in those families not
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
*/
Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
Map<byte[], Long> oldSequenceIds = null;
Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
synchronized (tieLock) {
Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
if (m != null) {
// NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled
// circumstance because another concurrent thread now may add sequenceids for this family
// (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it
// is fine because updates are blocked when this method is called. Make sure!!!
for (byte[] familyName: families) {
Long seqId = m.remove(familyName);
if (seqId != null) {
if (oldSequenceIds == null) oldSequenceIds = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
oldSequenceIds.put(familyName, seqId);
}
}
if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) {
if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) {
LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) +
", sequenceid=" + oldSequenceIds);
}
}
if (m.isEmpty()) {
// Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever
// even if the region is already moved to other server.
// Do not worry about data racing, we held write lock of region when calling
// startCacheFlush, so no one can add value to the map we removed.
this.lowestUnflushedSequenceIds.remove(encodedRegionName);
} else {
// Flushing a subset of the region families. Return the sequence id of the oldest entry.
lowestUnflushedInRegion = Collections.min(m.values());
}
}
}
// Do this check outside lock.
if (oldSequenceIds != null && oldSequenceIds.isEmpty()) {
// TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either
// the region is already flushing (which would make this call invalid), or there
// were no appends after last flush, so why are we starting flush? Maybe we should
// assert not empty. Less rigorous, but safer, alternative is telling the caller to stop.
// For now preserve old logic.
LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName));
}
return lowestUnflushedInRegion;
}
void completeCacheFlush(final byte [] encodedRegionName) {
synchronized (tieLock) {
this.flushingSequenceIds.remove(encodedRegionName);
}
}
void abortCacheFlush(final byte[] encodedRegionName) {
// Method is called when we are crashing down because failed write flush AND it is called
// if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids.
Map<byte[], Long> flushing = null;
Map<byte[], Long> tmpMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
// Here we are moving sequenceids from flushing back to unflushed; doing opposite of what
// happened in startCacheFlush. During prepare phase, we have update lock on the region so
// no edits should be coming in via append.
synchronized (tieLock) {
flushing = this.flushingSequenceIds.remove(encodedRegionName);
if (flushing != null) {
Map<byte[], Long> unflushed = getOrCreateLowestSequenceIds(encodedRegionName);
for (Map.Entry<byte[], Long> e: flushing.entrySet()) {
// Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this
// value, it will now be in tmpMap.
tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue()));
}
}
}
// Here we are doing some 'test' to see if edits are going in out of order. What is it for?
// Carried over from old code.
if (flushing != null) {
for (Map.Entry<byte[], Long> e : flushing.entrySet()) {
Long currentId = tmpMap.get(e.getKey());
if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
String errorStr = Bytes.toString(encodedRegionName) + " family " +
Bytes.toString(e.getKey()) + " acquired edits out of order current memstore seq=" +
currentId + ", previous oldest unflushed id=" + e.getValue();
LOG.error(errorStr);
Runtime.getRuntime().halt(1);
}
}
}
}
/**
* See if passed <code>sequenceids</code> are lower -- i.e. earlier -- than any outstanding
* sequenceids, sequenceids we are holding on to in this accounting instance.
* @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make
* sense for it to be null).
* @return true if all sequenceids are lower, older than, the old sequenceids in this instance.
*/
boolean areAllLower(Map<byte[], Long> sequenceids) {
Map<byte[], Long> flushing = null;
Map<byte[], Long> unflushed = null;
synchronized (this.tieLock) {
// Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed
// data structures to use in tests below.
flushing = flattenToLowestSequenceId(this.flushingSequenceIds);
unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);
}
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
long oldestFlushing = Long.MAX_VALUE;
long oldestUnflushed = Long.MAX_VALUE;
if (flushing != null) {
if (flushing.containsKey(e.getKey())) oldestFlushing = flushing.get(e.getKey());
}
if (unflushed != null) {
if (unflushed.containsKey(e.getKey())) oldestUnflushed = unflushed.get(e.getKey());
}
long min = Math.min(oldestFlushing, oldestUnflushed);
if (min <= e.getValue()) return false;
}
return true;
}
/**
* Iterates over the given Map and compares sequence ids with corresponding
* entries in {@link #oldestUnflushedRegionSequenceIds}. If a region in
* {@link #oldestUnflushedRegionSequenceIds} has a sequence id less than that passed
* in <code>sequenceids</code> then return it.
* @param sequenceids Sequenceids keyed by encoded region name.
* @return regions found in this instance with sequence ids less than those passed in.
*/
byte[][] findLower(Map<byte[], Long> sequenceids) {
List<byte[]> toFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (tieLock) {
for (Map.Entry<byte[], Long> e: sequenceids.entrySet()) {
Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(e.getKey());
if (m == null) continue;
// The lowest sequence id outstanding for this region.
long lowest = getLowestSequenceId(m);
if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
if (toFlush == null) toFlush = new ArrayList<byte[]>();
toFlush.add(e.getKey());
}
}
}
return toFlush == null? null: toFlush.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
}

View File

@ -26,21 +26,19 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
// imports for things that haven't moved from regionserver.wal yet.
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.FSUtils;
/**
* No-op implementation of {@link WALProvider} used when the WAL is disabled.
@ -187,8 +185,9 @@ class DisabledWALProvider implements WALProvider {
}
@Override
public boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
return !(closed.get());
public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
if (closed.get()) return null;
return HConstants.NO_SEQNUM;
}
@Override

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -39,6 +40,8 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import com.google.common.annotations.VisibleForTesting;
/**
* A Write Ahead Log (WAL) provides service for reading, writing waledits. This interface provides
* APIs for WAL users (such as RegionServer) to use the WAL (do append, sync, etc).
@ -140,31 +143,36 @@ public interface WAL {
void sync(long txid) throws IOException;
/**
* WAL keeps track of the sequence numbers that were not yet flushed from memstores
* in order to be able to do cleanup. This method tells WAL that some region is about
* to flush memstore.
* WAL keeps track of the sequence numbers that are as yet not flushed im memstores
* in order to be able to do accounting to figure which WALs can be let go. This method tells WAL
* that some region is about to flush. The flush can be the whole region or for a column family
* of the region only.
*
* <p>We stash the oldest seqNum for the region, and let the the next edit inserted in this
* region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit,
* AtomicLong, boolean, List)} as new oldest seqnum.
* In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
* the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
*
* @return true if the flush can proceed, false in case wal is closing (ususally, when server is
* closing) and flush couldn't be started.
* <p>Currently, it is expected that the update lock is held for the region; i.e. no
* concurrent appends while we set up cache flush.
* @param families Families to flush. May be a subset of all families in the region.
* @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if
* we are flushing a subset of all families but there are no edits in those families not
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
* @see #completeCacheFlush(byte[])
* @see #abortCacheFlush(byte[])
*/
boolean startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames);
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.
* @see #startCacheFlush(byte[], Set)
* @see #abortCacheFlush(byte[])
*/
void completeCacheFlush(final byte[] encodedRegionName);
/**
* Abort a cache flush. Call if the flush fails. Note that the only recovery
* for an aborted flush currently is a restart of the regionserver so the
* snapshot content dropped by the failure gets restored to the memstore.v
* snapshot content dropped by the failure gets restored to the memstore.
* @param encodedRegionName Encoded region name.
*/
void abortCacheFlush(byte[] encodedRegionName);
@ -174,19 +182,22 @@ public interface WAL {
*/
WALCoprocessorHost getCoprocessorHost();
/** Gets the earliest sequence number in the memstore for this particular region.
* This can serve as best-effort "recent" WAL number for this region.
/**
* Gets the earliest unflushed sequence id in the memstore for the region.
* @param encodedRegionName The region to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent.
* @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
* @deprecated Since version 1.2.0. Removing because not used and exposes subtle internal
* workings. Use {@link #getEarliestMemstoreSeqNum(byte[], byte[])}
*/
@VisibleForTesting
@Deprecated
long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
/**
* Gets the earliest sequence number in the memstore for this particular region and store.
* Gets the earliest unflushed sequence id in the memstore for the store.
* @param encodedRegionName The region to get the number for.
* @param familyName The family to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent.
* @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
*/
long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName);

View File

@ -124,6 +124,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
/**
* This class is responsible for splitting up a bunch of regionserver commit log
@ -324,15 +325,19 @@ public class WALSplitter {
failedServerName = (serverName == null) ? "" : serverName.getServerName();
while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName();
String key = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) {
if (this.distributedLogReplay) {
RegionStoreSequenceIds ids =
csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
key);
encodedRegionNameAsStr);
if (ids != null) {
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
TextFormat.shortDebugString(ids));
}
}
} else if (sequenceIdChecker != null) {
RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
@ -341,13 +346,17 @@ public class WALSplitter {
maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
storeSeqId.getSequenceId());
}
regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores);
lastFlushedSequenceId = ids.getLastFlushedSequenceId();
if (LOG.isDebugEnabled()) {
LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " +
TextFormat.shortDebugString(ids));
}
}
if (lastFlushedSequenceId == null) {
lastFlushedSequenceId = -1L;
}
lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId);
}
if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
editsSkipped++;
@ -1071,7 +1080,7 @@ public class WALSplitter {
}
private void doRun() throws IOException {
LOG.debug("Writer thread " + this + ": starting");
if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
while (true) {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) {
@ -1226,7 +1235,8 @@ public class WALSplitter {
}
}
controller.checkForErrors();
LOG.info("Split writers finished");
LOG.info((this.writerThreads == null? 0: this.writerThreads.size()) +
" split writers finished; closing...");
return (!progress_failed);
}
@ -1317,12 +1327,14 @@ public class WALSplitter {
CompletionService<Void> completionService =
new ExecutorCompletionService<Void>(closeThreadPool);
for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
if (LOG.isTraceEnabled()) {
LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
}
completionService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
LOG.debug("Closing " + wap.p);
if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
try {
wap.w.close();
} catch (IOException ioe) {
@ -1330,8 +1342,8 @@ public class WALSplitter {
thrown.add(ioe);
return null;
}
LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in "
+ (wap.nanosSpent / 1000 / 1000) + "ms)");
LOG.info("Closed " + wap.p + "; wrote " + wap.editsWritten + " edit(s) in "
+ (wap.nanosSpent / 1000 / 1000) + "ms");
if (wap.editsWritten == 0) {
// just remove the empty recovered.edits file
@ -1490,8 +1502,8 @@ public class WALSplitter {
}
}
Writer w = createWriter(regionedits);
LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
return (new WriterAndPath(regionedits, w));
LOG.debug("Creating writer path=" + regionedits);
return new WriterAndPath(regionedits, w);
}
private void filterCellByStore(Entry logEntry) {
@ -1505,6 +1517,7 @@ public class WALSplitter {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
byte[] family = CellUtil.cloneFamily(cell);
Long maxSeqId = maxSeqIdInStores.get(family);
LOG.info("CHANGE REMOVE " + Bytes.toString(family) + ", max=" + maxSeqId);
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
// or the master was crashed before and we can not get the information.
if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) {
@ -1544,10 +1557,10 @@ public class WALSplitter {
filterCellByStore(logEntry);
if (!logEntry.getEdit().isEmpty()) {
wap.w.append(logEntry);
}
this.updateRegionMaximumEditLogSeqNum(logEntry);
editsCount++;
}
}
// Pass along summary statistics
wap.incrementEdits(editsCount);
wap.incrementNanoTime(System.nanoTime() - startTime);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.security.UserProvider;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;

View File

@ -91,6 +91,7 @@ public class TestGetLastFlushedSequenceId {
testUtil.getHBaseCluster().getMaster()
.getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId());
// This will be the sequenceid just before that of the earliest edit in memstore.
long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId();
assertTrue(storeSequenceId > 0);
testUtil.getHBaseAdmin().flush(tableName);

View File

@ -230,6 +230,35 @@ public class TestHRegion {
return name.getMethodName();
}
/**
* Test that I can use the max flushed sequence id after the close.
* @throws IOException
*/
@Test (timeout = 100000)
public void testSequenceId() throws IOException {
HRegion region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
// Weird. This returns 0 if no store files or no edits. Afraid to change it.
assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
region.close();
assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
// Open region again.
region = initHRegion(tableName, name.getMethodName(), CONF, COLUMN_FAMILY_BYTES);
byte [] value = Bytes.toBytes(name.getMethodName());
// Make a random put against our cf.
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, null, value);
region.put(put);
// No flush yet so init numbers should still be in place.
assertEquals(HConstants.NO_SEQNUM, region.getMaxFlushedSeqId());
assertEquals(0, (long)region.getMaxStoreSeqId().get(COLUMN_FAMILY_BYTES));
region.flush(true);
long max = region.getMaxFlushedSeqId();
region.close();
assertEquals(max, region.getMaxFlushedSeqId());
}
/**
* Test for Bug 2 of HBASE-10466.
* "Bug 2: Conditions for the first flush of region close (so-called pre-flush) If memstoreSize

View File

@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.Collection;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mortbay.log.Log;
/**
* Testcase for https://issues.apache.org/jira/browse/HBASE-13811
*/
@Category({ MediumTests.class })
public class TestSplitWalDataLoss {
private final HBaseTestingUtility testUtil = new HBaseTestingUtility();
private NamespaceDescriptor namespace = NamespaceDescriptor.create(getClass().getSimpleName())
.build();
private TableName tableName = TableName.valueOf(namespace.getName(), "dataloss");
private byte[] family = Bytes.toBytes("f");
private byte[] qualifier = Bytes.toBytes("q");
@Before
public void setUp() throws Exception {
testUtil.getConfiguration().setInt("hbase.regionserver.msginterval", 30000);
testUtil.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
testUtil.startMiniCluster(2);
HBaseAdmin admin = testUtil.getHBaseAdmin();
admin.createNamespace(namespace);
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
testUtil.waitTableAvailable(tableName);
}
@After
public void tearDown() throws Exception {
testUtil.shutdownMiniCluster();
}
@Test
public void test() throws IOException, InterruptedException {
final HRegionServer rs = testUtil.getRSForFirstRegionInTable(tableName);
final HRegion region = (HRegion) rs.getOnlineRegions(tableName).get(0);
HRegion spiedRegion = spy(region);
final MutableBoolean flushed = new MutableBoolean(false);
final MutableBoolean reported = new MutableBoolean(false);
doAnswer(new Answer<FlushResult>() {
@Override
public FlushResult answer(InvocationOnMock invocation) throws Throwable {
synchronized (flushed) {
flushed.setValue(true);
flushed.notifyAll();
}
synchronized (reported) {
while (!reported.booleanValue()) {
reported.wait();
}
}
rs.getWAL(region.getRegionInfo()).abortCacheFlush(
region.getRegionInfo().getEncodedNameAsBytes());
throw new DroppedSnapshotException("testcase");
}
}).when(spiedRegion).internalFlushCacheAndCommit(Matchers.<WAL> any(),
Matchers.<MonitoredTask> any(), Matchers.<PrepareFlushResult> any(),
Matchers.<Collection<Store>> any());
rs.onlineRegions.put(rs.onlineRegions.keySet().iterator().next(), spiedRegion);
Connection conn = testUtil.getConnection();
try (Table table = conn.getTable(tableName)) {
table.put(new Put(Bytes.toBytes("row0")).addColumn(family, qualifier, Bytes.toBytes("val0")));
}
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
Log.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
rs.cacheFlusher.requestFlush(spiedRegion, false);
synchronized (flushed) {
while (!flushed.booleanValue()) {
flushed.wait();
}
}
try (Table table = conn.getTable(tableName)) {
table.put(new Put(Bytes.toBytes("row1")).addColumn(family, qualifier, Bytes.toBytes("val1")));
}
long now = EnvironmentEdgeManager.currentTime();
rs.tryRegionServerReport(now - 500, now);
synchronized (reported) {
reported.setValue(true);
reported.notifyAll();
}
while (testUtil.getRSForFirstRegionInTable(tableName) == rs) {
Thread.sleep(100);
}
try (Table table = conn.getTable(tableName)) {
Result result = table.get(new Get(Bytes.toBytes("row0")));
assertArrayEquals(Bytes.toBytes("val0"), result.getValue(family, qualifier));
}
}
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -27,9 +26,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@ -50,7 +47,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@ -58,6 +54,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -320,53 +317,6 @@ public class TestFSHLog {
}
}
/**
* Simulates WAL append ops for a region and tests
* {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
* It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
* If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
* region should be flushed before archiving this WAL.
*/
@Test
public void testAllRegionsFlushed() {
LOG.debug("testAllRegionsFlushed");
Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
// create a table
TableName t1 = TableName.valueOf("t1");
// create a region
HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// variables to mock region sequenceIds
final AtomicLong sequenceId1 = new AtomicLong(1);
// test empty map
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// add entries in the region
seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
// should say region1 is not flushed.
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// test with entries in oldestFlushing map.
oldestUnFlushedSeqNo.clear();
oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
oldestFlushingSeqNo.clear();
oldestUnFlushedSeqNo.clear();
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// insert some large values for region1
oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// tests when oldestUnFlushed/oldestFlushing contains larger value.
// It means region is flushed.
oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
oldestUnFlushedSeqNo.clear();
seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
}
@Test(expected=IOException.class)
public void testFailedToCreateWALIfParentRenamed() throws IOException {
final String name = "testFailedToCreateWALIfParentRenamed";

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestSequenceIdAccounting {
private static final byte [] ENCODED_REGION_NAME = Bytes.toBytes("r");
private static final byte [] FAMILY_NAME = Bytes.toBytes("cf");
private static final Set<byte[]> FAMILIES;
static {
FAMILIES = new HashSet<byte[]>();
FAMILIES.add(FAMILY_NAME);
}
@Test
public void testStartCacheFlush() {
SequenceIdAccounting sida = new SequenceIdAccounting();
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
Map<byte[], Long> m = new HashMap<byte[], Long>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
// Only one family so should return NO_SEQNUM still.
assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES));
sida.completeCacheFlush(ENCODED_REGION_NAME);
long currentSequenceId = sequenceid;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
final Set<byte[]> otherFamily = new HashSet<byte[]>(1);
otherFamily.add(Bytes.toBytes("otherCf"));
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
// Should return oldest sequence id in the region.
assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily));
sida.completeCacheFlush(ENCODED_REGION_NAME);
}
@Test
public void testAreAllLower() {
SequenceIdAccounting sida = new SequenceIdAccounting();
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
Map<byte[], Long> m = new HashMap<byte[], Long>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertTrue(sida.areAllLower(m));
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
assertTrue(sida.areAllLower(m));
m.put(ENCODED_REGION_NAME, sequenceid);
assertFalse(sida.areAllLower(m));
long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
assertEquals("Lowest should be first sequence id inserted", 1, lowest);
m.put(ENCODED_REGION_NAME, lowest);
assertFalse(sida.areAllLower(m));
// Now make sure above works when flushing.
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
assertFalse(sida.areAllLower(m));
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
assertTrue(sida.areAllLower(m));
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
sida.completeCacheFlush(ENCODED_REGION_NAME);
m.put(ENCODED_REGION_NAME, sequenceid);
assertTrue(sida.areAllLower(m));
// Flush again but add sequenceids while we are flushing.
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
m.put(ENCODED_REGION_NAME, lowest);
assertFalse(sida.areAllLower(m));
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
// The cache flush will clear out all sequenceid accounting by region.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
sida.completeCacheFlush(ENCODED_REGION_NAME);
// No new edits have gone in so no sequenceid to work with.
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
// Make an edit behind all we'll put now into sida.
m.put(ENCODED_REGION_NAME, sequenceid);
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
assertTrue(sida.areAllLower(m));
}
@Test
public void testFindLower() {
SequenceIdAccounting sida = new SequenceIdAccounting();
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
Map<byte[], Long> m = new HashMap<byte[], Long>();
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
long sequenceid = 1;
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
assertTrue(sida.findLower(m) == null);
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME));
assertTrue(sida.findLower(m).length == 1);
m.put(ENCODED_REGION_NAME, sida.getLowestSequenceId(ENCODED_REGION_NAME) - 1);
assertTrue(sida.findLower(m) == null);
}
}