HBASE-8741 Scope sequenceid to the region rather than regionserver (WAS: Mutations on Regions in recovery mode might have same sequenceIDs)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1539743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2013-11-07 18:10:38 +00:00
parent 698650982d
commit a01ff38061
17 changed files with 623 additions and 298 deletions

View File

@ -208,6 +208,13 @@ public class HRegion implements HeapSize { // , Writable{
protected long completeSequenceId = -1L;
/**
* Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1,
* as a marker that the region hasn't opened yet. Once it is opened, it is set to
* {@link #openSeqNum}.
*/
private final AtomicLong sequenceId = new AtomicLong(-1L);
/**
* Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for
* startRegionOperation to possibly invoke different checks before any region operations. Not all
@ -1518,16 +1525,16 @@ public class HRegion implements HeapSize { // , Writable{
// Record the mvcc for all transactions in progress.
w = mvcc.beginMemstoreInsert();
mvcc.advanceMemstore(w);
// check if it is not closing.
if (wal != null) {
Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
if (startSeqId == null) {
status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName()
+ "] - WAL is going away");
if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
status.setStatus("Flush will not be started for ["
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.");
return false;
}
flushSeqId = startSeqId;
flushSeqId = this.sequenceId.incrementAndGet();
} else {
// use the provided sequence Id as WAL is not being used for this flush.
flushSeqId = myseqid;
}
@ -2221,7 +2228,7 @@ public class HRegion implements HeapSize { // , Writable{
Mutation mutation = batchOp.operations[firstIndex];
if (walEdit.size() > 0) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdit, mutation.getClusterIds(), now, this.htableDescriptor);
walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId);
}
// -------------------------------
@ -3312,7 +3319,7 @@ public class HRegion implements HeapSize { // , Writable{
if(bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1);
store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1);
if(bulkLoadListener != null) {
bulkLoadListener.doneBulkLoad(familyName, path);
}
@ -3906,7 +3913,9 @@ public class HRegion implements HeapSize { // , Writable{
HRegion region = HRegion.newHRegion(tableDir,
effectiveHLog, fs, conf, info, hTableDescriptor, null);
if (initialize) {
region.initialize();
// If initializing, set the sequenceId. It is also required by HLogPerformanceEvaluation when
// verifying the WALEdits.
region.setSequenceId(region.initialize());
}
return region;
}
@ -4087,10 +4096,7 @@ public class HRegion implements HeapSize { // , Writable{
checkCompressionCodecs();
this.openSeqNum = initialize(reporter);
if (this.log != null) {
this.log.setSequenceNumber(this.openSeqNum);
}
this.setSequenceId(openSeqNum);
return this;
}
@ -4498,8 +4504,9 @@ public class HRegion implements HeapSize { // , Writable{
long txid = 0;
// 7. Append no sync
if (!walEdit.isEmpty()) {
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdit, processor.getClusterIds(), now, this.htableDescriptor);
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
this.htableDescriptor, this.sequenceId);
}
// 8. Release region lock
if (locked) {
@ -4740,9 +4747,9 @@ public class HRegion implements HeapSize { // , Writable{
// Using default cluster id, as this can only happen in the orginating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId);
} else {
recordMutationWithoutWal(append.getFamilyCellMap());
}
@ -4914,9 +4921,9 @@ public class HRegion implements HeapSize { // , Writable{
// Using default cluster id, as this can only happen in the orginating
// cluster. A slave cluster receives the final value (not the delta)
// as a Put.
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
walEdits, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(),
this.htableDescriptor);
txid = this.log.appendNoSync(this.getRegionInfo(),
this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId);
} else {
recordMutationWithoutWal(increment.getFamilyCellMap());
}
@ -4981,7 +4988,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT +
ClassSize.ARRAY +
40 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(11 * Bytes.SIZEOF_LONG) +
5 * Bytes.SIZEOF_BOOLEAN);
@ -5565,6 +5572,21 @@ public class HRegion implements HeapSize { // , Writable{
assert newValue >= 0;
}
/**
* @return sequenceId.
*/
public AtomicLong getSequenceId() {
return this.sequenceId;
}
/**
* sets this region's sequenceId.
* @param value new value
*/
private void setSequenceId(long value) {
this.sequenceId.set(value);
}
/**
* Listener class to enable callers of
* bulkLoadHFile() to perform any necessary

View File

@ -1074,7 +1074,7 @@ public class HStore implements Store {
CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info,
family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString()));
HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor);
this.region.getRegionInfo(), compactionDescriptor, this.region.getSequenceId());
}
private void replaceStoreFiles(final Collection<StoreFile> compactedFiles,

View File

@ -26,13 +26,13 @@ import java.lang.reflect.Method;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -142,13 +142,6 @@ class FSHLog implements HLog, Syncable {
*/
Writer writer;
/**
* Map of all log files but the current one.
*/
final SortedMap<Long, Path> outputfiles =
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
/**
* This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
* with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
@ -177,8 +170,6 @@ class FSHLog implements HLog, Syncable {
private volatile boolean closed = false;
private final AtomicLong logSeqNum = new AtomicLong(0);
private boolean forMeta = false;
// The timestamp (in ms) when the log file was created.
@ -228,6 +219,39 @@ class FSHLog implements HLog, Syncable {
private final AtomicInteger closeErrorCount = new AtomicInteger();
private final MetricsWAL metrics;
/**
* Map of region encoded names to the latest sequence num obtained from them while appending
* WALEdits to the wal. We create one map for each WAL file at the time it is rolled.
* <p>
* When deciding whether to archive a WAL file, we compare the sequence IDs in this map to
* {@link #oldestFlushingSeqNums} and {@link #oldestUnflushedSeqNums}.
* See {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} for more info.
* <p>
* This map uses byte[] as the key, and uses reference equality. It works in our use case as we
* use {@link HRegionInfo#getEncodedNameAsBytes()} as keys. For a given region, it always returns
* the same array.
*/
private Map<byte[], Long> latestSequenceNums = new HashMap<byte[], Long>();
/**
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name.
*/
public final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() {
@Override
public int compare(Path o1, Path o2) {
long t1 = getFileNumFromFileName(o1);
long t2 = getFileNumFromFileName(o2);
if (t1 == t2) return 0;
return (t1 > t2) ? 1 : -1;
}
};
/**
* Map of log file to the latest sequence nums of all regions it has entries of.
* The map is sorted by the log file creation timestamp (contained in the log file name).
*/
private NavigableMap<Path, Map<byte[], Long>> hlogSequenceNums =
new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
/**
* Constructor.
@ -436,21 +460,6 @@ class FSHLog implements HLog, Syncable {
return this.filenum;
}
@Override
public void setSequenceNumber(final long newvalue) {
for (long id = this.logSeqNum.get(); id < newvalue &&
!this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
// This could spin on occasion but better the occasional spin than locking
// every increment of sequence number.
LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
}
}
@Override
public long getSequenceNumber() {
return logSeqNum.get();
}
/**
* Method used internal to this class and for tests only.
* @return The wrapped stream our writer is using; its not the
@ -524,11 +533,17 @@ class FSHLog implements HLog, Syncable {
this.writer = nextWriter;
this.hdfs_out = nextHdfsOut;
this.numEntries.set(0);
if (oldFile != null) {
this.hlogSequenceNums.put(oldFile, this.latestSequenceNums);
this.latestSequenceNums = new HashMap<byte[], Long>();
}
}
if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
else LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries +
else {
LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries +
", filesize=" + StringUtils.humanReadableInt(this.fs.getFileStatus(oldFile).getLen()) +
"; new WAL " + FSUtils.getPath(newPath));
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
@ -540,7 +555,7 @@ class FSHLog implements HLog, Syncable {
// Can we delete any of the old log files?
if (getNumLogFiles() > 0) {
cleanOldLogs();
regionsToFlush = getRegionsToForceFlush();
regionsToFlush = findRegionsToForceFlush();
}
} finally {
this.logRollRunning = false;
@ -568,87 +583,126 @@ class FSHLog implements HLog, Syncable {
return HLogFactory.createWALWriter(fs, path, conf);
}
/*
* Clean up old commit logs.
* @return If lots of logs, flush the returned region so next time through
* we can clean logs. Returns null if nothing to flush. Returns array of
* encoded region names to flush.
/**
* Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits
* are already flushed by the corresponding regions.
* <p>
* For each log file, it compares its region to sequenceId map
* (@link {@link FSHLog#latestSequenceNums} with corresponding region entries in
* {@link FSHLog#oldestFlushingSeqNums} and {@link FSHLog#oldestUnflushedSeqNums}.
* If all the regions in the map are flushed past of their value, then the wal is eligible for
* archiving.
* @throws IOException
*/
private void cleanOldLogs() throws IOException {
long oldestOutstandingSeqNum = Long.MAX_VALUE;
Map<byte[], Long> oldestFlushingSeqNumsLocal = null;
Map<byte[], Long> oldestUnflushedSeqNumsLocal = null;
List<Path> logsToArchive = new ArrayList<Path>();
// make a local copy so as to avoid locking when we iterate over these maps.
synchronized (oldestSeqNumsLock) {
Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
oldestFlushingSeqNumsLocal = new HashMap<byte[], Long>(this.oldestFlushingSeqNums);
oldestUnflushedSeqNumsLocal = new HashMap<byte[], Long>(this.oldestUnflushedSeqNums);
}
// Get the set of all log files whose last sequence number is smaller than
// the oldest edit's sequence number.
TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
oldestOutstandingSeqNum).keySet());
// Now remove old log files (if any)
if (LOG.isDebugEnabled()) {
if (sequenceNumbers.size() > 0) {
LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
" out of total " + this.outputfiles.size() + ";" +
" oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
for (Map.Entry<Path, Map<byte[], Long>> e : hlogSequenceNums.entrySet()) {
// iterate over the log file.
Path log = e.getKey();
Map<byte[], Long> sequenceNums = e.getValue();
// iterate over the map for this log file, and tell whether it should be archive or not.
if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal,
oldestUnflushedSeqNumsLocal)) {
logsToArchive.add(log);
LOG.debug("log file is ready for archiving " + log);
}
}
for (Long seq : sequenceNumbers) {
archiveLogFile(this.outputfiles.remove(seq), seq);
for (Path p : logsToArchive) {
archiveLogFile(p);
this.hlogSequenceNums.remove(p);
}
}
/**
* Return regions that have edits that are equal or less than a certain sequence number.
* Static due to some old unit test.
* @param walSeqNum The sequence number to compare with.
* @param regionsToSeqNums Encoded region names to sequence ids
* @return All regions whose seqNum <= walSeqNum. Null if no regions found.
* Takes a region:sequenceId map for a WAL file, and checks whether the file can be archived.
* It compares the region entries present in the passed sequenceNums map with the local copy of
* {@link #oldestUnflushedSeqNums} and {@link #oldestFlushingSeqNums}. If, for all regions,
* the value is lesser than the minimum of values present in the oldestFlushing/UnflushedSeqNums,
* then the wal file is eligible for archiving.
* @param sequenceNums for a HLog, at the time when it was rolled.
* @param oldestFlushingMap
* @param oldestUnflushedMap
* @return true if wal is eligible for archiving, false otherwise.
*/
static byte[][] findMemstoresWithEditsEqualOrOlderThan(
final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
List<byte[]> regions = null;
for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
if (e.getValue().longValue() <= walSeqNum) {
if (regions == null) regions = new ArrayList<byte[]>();
regions.add(e.getKey());
static boolean areAllRegionsFlushed(Map<byte[], Long> sequenceNums,
Map<byte[], Long> oldestFlushingMap, Map<byte[], Long> oldestUnflushedMap) {
for (Map.Entry<byte[], Long> regionSeqIdEntry : sequenceNums.entrySet()) {
// find region entries in the flushing/unflushed map. If there is no entry, it means
// a region doesn't have any unflushed entry.
long oldestFlushing = oldestFlushingMap.containsKey(regionSeqIdEntry.getKey()) ?
oldestFlushingMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
long oldestUnFlushed = oldestUnflushedMap.containsKey(regionSeqIdEntry.getKey()) ?
oldestUnflushedMap.get(regionSeqIdEntry.getKey()) : Long.MAX_VALUE;
// do a minimum to be sure to contain oldest sequence Id
long minSeqNum = Math.min(oldestFlushing, oldestUnFlushed);
if (minSeqNum <= regionSeqIdEntry.getValue()) return false;// can't archive
}
return true;
}
/**
* Iterates over the given map of regions, and compares their sequence numbers with corresponding
* entries in {@link #oldestUnflushedSeqNums}. If the sequence number is greater or equal, the
* region is eligible to flush, otherwise, there is no benefit to flush (from the perspective of
* passed regionsSequenceNums map), because the region has already flushed the entries present
* in the WAL file for which this method is called for (typically, the oldest wal file).
* @param regionsSequenceNums
* @return regions which should be flushed (whose sequence numbers are larger than their
* corresponding un-flushed entries.
*/
private byte[][] findEligibleMemstoresToFlush(Map<byte[], Long> regionsSequenceNums) {
List<byte[]> regionsToFlush = null;
// Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock.
synchronized (oldestSeqNumsLock) {
for (Map.Entry<byte[], Long> e : regionsSequenceNums.entrySet()) {
Long unFlushedVal = this.oldestUnflushedSeqNums.get(e.getKey());
if (unFlushedVal != null && unFlushedVal <= e.getValue()) {
if (regionsToFlush == null) regionsToFlush = new ArrayList<byte[]>();
regionsToFlush.add(e.getKey());
}
}
}
return regions == null ? null : regions
return regionsToFlush == null ? null : regionsToFlush
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
}
private byte[][] getRegionsToForceFlush() throws IOException {
// If too many log files, figure which regions we need to flush.
// Array is an array of encoded region names.
/**
* If the number of un-archived WAL files is greater than maximum allowed, it checks
* the first (oldest) WAL file, and returns the regions which should be flushed so that it could
* be archived.
* @return regions to flush in order to archive oldest wal file.
* @throws IOException
*/
byte[][] findRegionsToForceFlush() throws IOException {
byte [][] regions = null;
int logCount = getNumLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
// This is an array of encoded region names.
synchronized (oldestSeqNumsLock) {
regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
this.oldestUnflushedSeqNums);
}
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
sb.toString());
Map.Entry<Path, Map<byte[], Long>> firstWALEntry =
this.hlogSequenceNums.firstEntry();
regions = findEligibleMemstoresToFlush(firstWALEntry.getValue());
}
if (regions != null) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < regions.length; i++) {
if (i > 0) sb.append(", ");
sb.append(Bytes.toStringBinary(regions[i]));
}
LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" +
this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " +
sb.toString());
}
return regions;
}
/*
* Cleans up current writer closing and adding to outputfiles.
* Cleans up current writer closing.
* Presumes we're operating inside an updateLock scope.
* @return Path to current writer or null if none.
* @throws IOException
@ -690,18 +744,13 @@ class FSHLog implements HLog, Syncable {
}
if (currentfilenum >= 0) {
oldFile = computeFilename(currentfilenum);
this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile);
}
}
return oldFile;
}
private void archiveLogFile(final Path p, final Long seqno) throws IOException {
private void archiveLogFile(final Path p) throws IOException {
Path newPath = getHLogArchivePath(this.oldLogDir, p);
LOG.info("moving old hlog file " + FSUtils.getPath(p) +
" whose highest sequenceid is " + seqno + " to " +
FSUtils.getPath(newPath));
// Tell our listeners that a log is going to be archived.
if (!this.listeners.isEmpty()) {
for (WALActionsListener i : this.listeners) {
@ -745,6 +794,26 @@ class FSHLog implements HLog, Syncable {
return new Path(dir, child);
}
/**
* A log file has a creation timestamp (in ms) in its file name ({@link #filenum}.
* This helper method returns the creation timestamp from a given log file.
* It extracts the timestamp assuming the filename is created with the
* {@link #computeFilename(long filenum)} method.
* @param fileName
* @return timestamp, as in the log file name.
*/
protected long getFileNumFromFileName(Path fileName) {
if (fileName == null) throw new IllegalArgumentException("file name can't be null");
// The path should start with dir/<prefix>.
String prefixPathStr = new Path(dir, prefix + ".").toString();
if (!fileName.toString().startsWith(prefixPathStr)) {
throw new IllegalArgumentException("The log doesn't belong to this regionserver");
}
String chompedPath = fileName.toString().substring(prefixPathStr.length());
if (forMeta) chompedPath = chompedPath.substring(0, chompedPath.indexOf(META_HLOG_FILE_EXTN));
return Long.parseLong(chompedPath);
}
@Override
public void closeAndDelete() throws IOException {
close();
@ -835,15 +904,15 @@ class FSHLog implements HLog, Syncable {
@Override
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd)
final long now, HTableDescriptor htd, AtomicLong sequenceId)
throws IOException {
append(info, tableName, edits, now, htd, true);
append(info, tableName, edits, now, htd, true, sequenceId);
}
@Override
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException {
append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore);
public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now,
HTableDescriptor htd, boolean isInMemstore, AtomicLong sequenceId) throws IOException {
append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, isInMemstore, sequenceId);
}
/**
@ -869,12 +938,13 @@ class FSHLog implements HLog, Syncable {
* @param clusterIds that have consumed the change (for replication)
* @param now
* @param doSync shall we sync?
* @param sequenceId of the region.
* @return txid of this transaction
* @throws IOException
*/
@SuppressWarnings("deprecation")
private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, AtomicLong sequenceId)
throws IOException {
if (edits.isEmpty()) return this.unflushedEntries.get();
if (this.closed) {
@ -884,7 +954,9 @@ class FSHLog implements HLog, Syncable {
try {
long txid = 0;
synchronized (this.updateLock) {
long seqNum = obtainSeqNum();
// get the sequence number from the passed Long. In normal flow, it is coming from the
// region.
long seqNum = sequenceId.incrementAndGet();
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region (i.e. the first edit added to the particular
// memstore). . When the cache is flushed, the entry for the
@ -901,6 +973,7 @@ class FSHLog implements HLog, Syncable {
if (htd.isDeferredLogFlush()) {
lastDeferredTxid = txid;
}
this.latestSequenceNums.put(encodedRegionName, seqNum);
}
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
@ -918,9 +991,9 @@ class FSHLog implements HLog, Syncable {
@Override
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd)
List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId)
throws IOException {
return append(info, tableName, edits, clusterIds, now, htd, false, true);
return append(info, tableName, edits, clusterIds, now, htd, false, true, sequenceId);
}
/**
@ -1246,21 +1319,18 @@ class FSHLog implements HLog, Syncable {
return numEntries.get();
}
@Override
public long obtainSeqNum() {
return this.logSeqNum.incrementAndGet();
}
/** @return the number of log files in use */
int getNumLogFiles() {
return outputfiles.size();
return hlogSequenceNums.size();
}
@Override
public Long startCacheFlush(final byte[] encodedRegionName) {
public boolean startCacheFlush(final byte[] encodedRegionName) {
Long oldRegionSeqNum = null;
if (!closeBarrier.beginOp()) {
return null;
LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) +
" - because the server is closing.");
return false;
}
synchronized (oldestSeqNumsLock) {
oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
@ -1279,7 +1349,7 @@ class FSHLog implements HLog, Syncable {
LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
+ Bytes.toString(encodedRegionName) + "]");
}
return obtainSeqNum();
return true;
}
@Override

View File

@ -24,6 +24,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -205,19 +206,6 @@ public interface HLog {
// TODO: Remove. Implementation detail.
long getFilenum();
/**
* Called to ensure that log sequence numbers are always greater
*
* @param newvalue We'll set log edit/sequence number to this value if it is greater
* than the current value.
*/
void setSequenceNumber(final long newvalue);
/**
* @return log sequence number
*/
long getSequenceNumber();
// TODO: Log rolling should not be in this interface.
/**
* Roll the log writer. That is, start writing log messages to a new file.
@ -270,9 +258,10 @@ public interface HLog {
/**
* Same as appendNoSync(HRegionInfo, TableName, WALEdit, List, long, HTableDescriptor),
* except it causes a sync on the log
* @param sequenceId of the region.
*/
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd) throws IOException;
final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
/**
* Append a set of edits to the log. Log edits are keyed by (encoded)
@ -284,9 +273,10 @@ public interface HLog {
* @param now
* @param htd
* @param isInMemstore Whether the record is in memstore. False for system records.
* @param sequenceId of the region.
*/
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException;
public void append(HRegionInfo info, TableName tableName, WALEdit edits, final long now,
HTableDescriptor htd, boolean isInMemstore, AtomicLong sequenceId) throws IOException;
/**
* Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
@ -297,11 +287,12 @@ public interface HLog {
* @param clusterIds The clusters that have consumed the change (for replication)
* @param now
* @param htd
* @param sequenceId of the region
* @return txid of this transaction
* @throws IOException
*/
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
List<UUID> clusterIds, final long now, HTableDescriptor htd) throws IOException;
List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId) throws IOException;
// TODO: Do we need all these versions of sync?
void hsync() throws IOException;
@ -312,28 +303,21 @@ public interface HLog {
void sync(long txid) throws IOException;
/**
* Obtain a log sequence number.
*/
// TODO: Name better to differentiate from getSequenceNumber.
long obtainSeqNum();
/**
* WAL keeps track of the sequence numbers that were not yet flushed from memstores
* in order to be able to do cleanup. This method tells WAL that some region is about
* to flush memstore.
*
* We stash the oldest seqNum for the region, and let the the next edit inserted in this
* region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor)}
* as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
* in case of flush succeeding, the seqNum of that first edit after start becomes the
* valid oldest seqNum for this region.
* region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor,
* AtomicLong)} as new oldest seqnum.
* In case of flush being aborted, we put the stashed value back; in case of flush succeeding,
* the seqNum of that first edit after start becomes the valid oldest seqNum for this region.
*
* @return current seqNum, to pass on to flushers (who will put it into the metadata of
* the resulting file as an upper-bound seqNum for that file), or NULL if flush
* should not be started.
* @return true if the flush can proceed, false in case wal is closing (ususally, when server is
* closing) and flush couldn't be started.
*/
Long startCacheFlush(final byte[] encodedRegionName);
boolean startCacheFlush(final byte[] encodedRegionName);
/**
* Complete the cache flush.

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -256,12 +257,13 @@ public class HLogUtil {
* This provides info to the HMaster to allow it to recover the compaction if
* this regionserver dies in the middle (This part is not yet implemented). It also prevents
* the compaction from finishing if this regionserver has already lost its lease on the log.
* @param sequenceId Used by HLog to get sequence Id for the waledit.
*/
public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
final CompactionDescriptor c) throws IOException {
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
WALEdit e = WALEdit.createCompaction(c);
log.append(info, TableName.valueOf(c.getTableName().toByteArray()), e,
EnvironmentEdgeManager.currentTimeMillis(), htd, false);
EnvironmentEdgeManager.currentTimeMillis(), htd, false, sequenceId);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}

View File

@ -49,6 +49,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.*;
@ -138,6 +139,7 @@ public class TestWALObserver {
Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
deleteDir(basedir);
fs.mkdirs(new Path(basedir, hri.getEncodedName()));
final AtomicLong sequenceId = new AtomicLong(0);
HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir,
TestWALObserver.class.getName(), this.conf);
@ -186,7 +188,7 @@ public class TestWALObserver {
// it's where WAL write cp should occur.
long now = EnvironmentEdgeManager.currentTimeMillis();
log.append(hri, hri.getTable(), edit, now, htd);
log.append(hri, hri.getTable(), edit, now, htd, sequenceId);
// the edit shall have been change now by the coprocessor.
foundFamily0 = false;
@ -222,6 +224,7 @@ public class TestWALObserver {
// ultimately called by HRegion::initialize()
TableName tableName = TableName.valueOf("testWALCoprocessorReplay");
final HTableDescriptor htd = getBasic3FamilyHTableDescriptor(tableName);
final AtomicLong sequenceId = new AtomicLong(0);
// final HRegionInfo hri =
// createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
// final HRegionInfo hri1 =
@ -247,9 +250,9 @@ public class TestWALObserver {
// addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
// EnvironmentEdgeManager.getDelegate(), wal);
addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
EnvironmentEdgeManager.getDelegate(), wal, htd);
EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId);
}
wal.append(hri, tableName, edit, now, htd);
wal.append(hri, tableName, edit, now, htd, sequenceId);
// sync to fs.
wal.sync();
@ -369,7 +372,7 @@ public class TestWALObserver {
private void addWALEdits(final TableName tableName, final HRegionInfo hri,
final byte[] rowName, final byte[] family, final int count,
EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd)
EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, final AtomicLong sequenceId)
throws IOException {
String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) {
@ -378,7 +381,7 @@ public class TestWALObserver {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes, ee
.currentTimeMillis(), columnBytes));
wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd);
wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId);
}
}

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -114,11 +115,12 @@ public class TestHLogRecordReader {
HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
long ts = System.currentTimeMillis();
WALEdit edit = new WALEdit();
final AtomicLong sequenceId = new AtomicLong(0);
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value));
log.append(info, tableName, edit, ts, htd);
log.append(info, tableName, edit, ts, htd, sequenceId);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value));
log.append(info, tableName, edit, ts+1, htd);
log.append(info, tableName, edit, ts+1, htd, sequenceId);
log.rollWriter();
Thread.sleep(1);
@ -126,10 +128,10 @@ public class TestHLogRecordReader {
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value));
log.append(info, tableName, edit, ts1+1, htd);
log.append(info, tableName, edit, ts1+1, htd, sequenceId);
edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value));
log.append(info, tableName, edit, ts1+2, htd);
log.append(info, tableName, edit, ts1+2, htd, sequenceId);
log.close();
HLogInputFormat input = new HLogInputFormat();
@ -161,11 +163,12 @@ public class TestHLogRecordReader {
public void testHLogRecordReader() throws Exception {
HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf);
byte [] value = Bytes.toBytes("value");
final AtomicLong sequenceId = new AtomicLong(0);
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"),
System.currentTimeMillis(), value));
log.append(info, tableName, edit,
System.currentTimeMillis(), htd);
System.currentTimeMillis(), htd, sequenceId);
Thread.sleep(1); // make sure 2nd log gets a later timestamp
long secondTs = System.currentTimeMillis();
@ -175,7 +178,7 @@ public class TestHLogRecordReader {
edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"),
System.currentTimeMillis(), value));
log.append(info, tableName, edit,
System.currentTimeMillis(), htd);
System.currentTimeMillis(), htd, sequenceId);
log.close();
long thirdTs = System.currentTimeMillis();

View File

@ -252,7 +252,6 @@ public class TestDistributedLogSplitting {
@Test(timeout = 300000)
public void testLogReplayWithNonMetaRSDown() throws Exception {
LOG.info("testLogReplayWithNonMetaRSDown");
conf.setLong("hbase.regionserver.hlog.blocksize", 100*1024);
conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true);
startCluster(NUM_RS);
final int NUM_REGIONS_TO_CREATE = 40;
@ -1144,6 +1143,9 @@ public class TestDistributedLogSplitting {
TableName fullTName = TableName.valueOf(tname);
// remove root and meta region
regions.remove(HRegionInfo.FIRST_META_REGIONINFO);
// using one sequenceId for edits across all regions is ok.
final AtomicLong sequenceId = new AtomicLong(10);
for(Iterator<HRegionInfo> iter = regions.iterator(); iter.hasNext(); ) {
HRegionInfo regionInfo = iter.next();
@ -1183,7 +1185,7 @@ public class TestDistributedLogSplitting {
// key
byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i));
e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value));
log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd);
log.append(curRegionInfo, fullTName, e, System.currentTimeMillis(), htd, sequenceId);
counts[i % n] += 1;
}
}

View File

@ -53,6 +53,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
@ -462,7 +463,7 @@ public class TestHRegion {
.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(),
this.region.getRegionInfo(), compactionDescriptor);
this.region.getRegionInfo(), compactionDescriptor, new AtomicLong(1));
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
@ -3799,10 +3800,10 @@ public class TestHRegion {
put.add(family, Bytes.toBytes("q1"), Bytes.toBytes("v1"));
put.setDurability(mutationDurability);
region.put(put);
// verify append called or not
verify(log, expectAppend ? times(1) : never()).appendNoSync((HRegionInfo) any(), eq(tableName),
(WALEdit) any(), (List<UUID>) any(), anyLong(), (HTableDescriptor) any());
//verify append called or not
verify(log, expectAppend ? times(1) : never())
.appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List<UUID>)any(),
anyLong(), (HTableDescriptor)any(), (AtomicLong)any());
// verify sync called or not
if (expectSync || expectSyncFromLogSyncer) {

View File

@ -20,10 +20,12 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -106,9 +108,10 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit);
HRegionInfo hri = region.getRegionInfo();
if (this.noSync) {
hlog.appendNoSync(hri, hri.getTable(), walEdit, new ArrayList<UUID>(), now, htd);
hlog.appendNoSync(hri, hri.getTable(), walEdit, new ArrayList<UUID>(), now, htd,
region.getSequenceId());
} else {
hlog.append(hri, hri.getTable(), walEdit, now, htd);
hlog.append(hri, hri.getTable(), walEdit, now, htd, region.getSequenceId());
}
}
long totalTime = (System.currentTimeMillis() - startTime);
@ -251,16 +254,15 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
/**
* Verify the content of the WAL file.
* Verify that sequenceids are ascending and that the file has expected number
* of edits.
* Verify that the file has expected number of edits.
* @param wal
* @return Count of edits.
* @throws IOException
*/
private long verify(final Path wal, final boolean verbose) throws IOException {
HLog.Reader reader = HLogFactory.createReader(wal.getFileSystem(getConf()), wal, getConf());
long previousSeqid = -1;
long count = 0;
Map<String, Long> sequenceIds = new HashMap<String, Long>();
try {
while (true) {
Entry e = reader.next();
@ -270,12 +272,17 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool
}
count++;
long seqid = e.getKey().getLogSeqNum();
if (verbose) LOG.info("seqid=" + seqid);
if (previousSeqid >= seqid) {
throw new IllegalStateException("wal=" + wal.getName() +
", previousSeqid=" + previousSeqid + ", seqid=" + seqid);
if (sequenceIds.containsKey(Bytes.toString(e.getKey().getEncodedRegionName()))) {
// sequenceIds should be increasing for every regions
if (sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName())) >= seqid) {
throw new IllegalStateException("wal = " + wal.getName() + ", " + "previous seqid = "
+ sequenceIds.get(Bytes.toString(e.getKey().getEncodedRegionName()))
+ ", current seqid = " + seqid);
}
} else {
sequenceIds.put(Bytes.toString(e.getKey().getEncodedRegionName()), seqid);
}
previousSeqid = seqid;
if (verbose) LOG.info("seqid=" + seqid);
}
} finally {
reader.close();

View File

@ -23,9 +23,14 @@ import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.BindException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -58,6 +63,9 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** JUnit test case for HLog */
@Category(LargeTests.class)
@ -136,15 +144,13 @@ public class TestHLog {
}
/**
* Test that with three concurrent threads we still write edits in sequence
* edit id order.
* Write to a log file with three concurrent threads and verifying all data is written.
* @throws Exception
*/
@Test
public void testMaintainOrderWithConcurrentWrites() throws Exception {
public void testConcurrentWrites() throws Exception {
// Run the HPE tool with three threads writing 3000 edits each concurrently.
// When done, verify that all edits were written and that the order in the
// WALs is of ascending edit sequence ids.
// When done, verify that all edits were written.
int errCode = HLogPerformanceEvaluation.
innerMain(new Configuration(TEST_UTIL.getConfiguration()),
new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
@ -179,6 +185,7 @@ public class TestHLog {
htd.addFamily(new HColumnDescriptor("column"));
// Add edits for three regions.
final AtomicLong sequenceId = new AtomicLong(1);
try {
for (int ii = 0; ii < howmany; ii++) {
for (int i = 0; i < howmany; i++) {
@ -192,7 +199,7 @@ public class TestHLog {
System.currentTimeMillis(), column));
LOG.info("Region " + i + ": " + edit);
log.append(infos[i], tableName, edit,
System.currentTimeMillis(), htd);
System.currentTimeMillis(), htd, sequenceId);
}
}
log.rollWriter();
@ -242,7 +249,7 @@ public class TestHLog {
in.close();
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf);
final AtomicLong sequenceId = new AtomicLong(1);
final int total = 20;
HLog.Reader reader = null;
@ -255,7 +262,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(info, tableName, kvs, System.currentTimeMillis(), htd);
wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
}
// Now call sync and try reading. Opening a Reader before you sync just
// gives you EOFE.
@ -273,7 +280,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(info, tableName, kvs, System.currentTimeMillis(), htd);
wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
}
reader = HLogFactory.createReader(fs, walPath, conf);
count = 0;
@ -292,7 +299,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), value));
wal.append(info, tableName, kvs, System.currentTimeMillis(), htd);
wal.append(info, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
@ -314,34 +321,6 @@ public class TestHLog {
}
}
/**
* Test the findMemstoresWithEditsEqualOrOlderThan method.
* @throws IOException
*/
@Test
public void testFindMemstoresWithEditsEqualOrOlderThan() throws IOException {
Map<byte [], Long> regionsToSeqids = new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < 10; i++) {
Long l = Long.valueOf(i);
regionsToSeqids.put(l.toString().getBytes(), l);
}
byte [][] regions =
FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
assertEquals(2, regions.length);
assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
Bytes.equals(regions[0], "1".getBytes()));
regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
int count = 4;
assertEquals(count, regions.length);
// Regions returned are not ordered.
for (int i = 0; i < count; i++) {
assertTrue(Bytes.equals(regions[i], "0".getBytes()) ||
Bytes.equals(regions[i], "1".getBytes()) ||
Bytes.equals(regions[i], "2".getBytes()) ||
Bytes.equals(regions[i], "3".getBytes()));
}
}
private void verifySplits(List<Path> splits, final int howmany)
throws IOException {
assertEquals(howmany * howmany, splits.size());
@ -391,6 +370,7 @@ public class TestHLog {
HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir",
"hlogdir_archive", conf);
final AtomicLong sequenceId = new AtomicLong(1);
final int total = 20;
HTableDescriptor htd = new HTableDescriptor();
@ -399,7 +379,7 @@ public class TestHLog {
for (int i = 0; i < total; i++) {
WALEdit kvs = new WALEdit();
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd);
wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
}
// Now call sync to send the data to HDFS datanodes
wal.sync();
@ -513,6 +493,7 @@ public class TestHLog {
HLog log = null;
try {
log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
final AtomicLong sequenceId = new AtomicLong(1);
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
@ -528,7 +509,7 @@ public class TestHLog {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor("column"));
log.append(info, tableName, cols, System.currentTimeMillis(), htd);
log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
log.startCacheFlush(info.getEncodedNameAsBytes());
log.completeCacheFlush(info.getEncodedNameAsBytes());
log.close();
@ -571,6 +552,7 @@ public class TestHLog {
final byte [] row = Bytes.toBytes("row");
Reader reader = null;
HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
final AtomicLong sequenceId = new AtomicLong(1);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
@ -585,7 +567,7 @@ public class TestHLog {
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor("column"));
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
log.startCacheFlush(hri.getEncodedNameAsBytes());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.close();
@ -626,6 +608,7 @@ public class TestHLog {
TableName.valueOf("tablename");
final byte [] row = Bytes.toBytes("row");
HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf);
final AtomicLong sequenceId = new AtomicLong(1);
try {
DumbWALActionsListener visitor = new DumbWALActionsListener();
log.registerWALActionsListener(visitor);
@ -640,7 +623,7 @@ public class TestHLog {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(i)),
timestamp, new byte[]{(byte) (i + '0')}));
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
}
assertEquals(COL_COUNT, visitor.increments);
log.unregisterWALActionsListener(visitor);
@ -648,7 +631,7 @@ public class TestHLog {
cols.add(new KeyValue(row, Bytes.toBytes("column"),
Bytes.toBytes(Integer.toString(11)),
timestamp, new byte[]{(byte) (11 + '0')}));
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId);
assertEquals(COL_COUNT, visitor.increments);
} finally {
if (log != null) log.closeAndDelete();
@ -665,6 +648,7 @@ public class TestHLog {
HLog log = HLogFactory.createHLog(fs, hbaseDir,
getName(), conf);
final AtomicLong sequenceId = new AtomicLong(1);
try {
HRegionInfo hri = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
@ -673,26 +657,26 @@ public class TestHLog {
// Add a single edit and make sure that rolling won't remove the file
// Before HBASE-3198 it used to delete it
addEdits(log, hri, tableName, 1);
addEdits(log, hri, tableName, 1, sequenceId);
log.rollWriter();
assertEquals(1, ((FSHLog) log).getNumLogFiles());
// See if there's anything wrong with more than 1 edit
addEdits(log, hri, tableName, 2);
addEdits(log, hri, tableName, 2, sequenceId);
log.rollWriter();
assertEquals(2, ((FSHLog) log).getNumLogFiles());
// Now mix edits from 2 regions, still no flushing
addEdits(log, hri, tableName, 1);
addEdits(log, hri2, tableName2, 1);
addEdits(log, hri, tableName, 1);
addEdits(log, hri2, tableName2, 1);
addEdits(log, hri, tableName, 1, sequenceId);
addEdits(log, hri2, tableName2, 1, sequenceId);
addEdits(log, hri, tableName, 1, sequenceId);
addEdits(log, hri2, tableName2, 1, sequenceId);
log.rollWriter();
assertEquals(3, ((FSHLog) log).getNumLogFiles());
// Flush the first region, we expect to see the first two files getting
// archived. We need to append something or writer won't be rolled.
addEdits(log, hri2, tableName2, 1);
addEdits(log, hri2, tableName2, 1, sequenceId);
log.startCacheFlush(hri.getEncodedNameAsBytes());
log.completeCacheFlush(hri.getEncodedNameAsBytes());
log.rollWriter();
@ -701,7 +685,7 @@ public class TestHLog {
// Flush the second region, which removes all the remaining output files
// since the oldest was completely flushed and the two others only contain
// flush information
addEdits(log, hri2, tableName2, 1);
addEdits(log, hri2, tableName2, 1, sequenceId);
log.startCacheFlush(hri2.getEncodedNameAsBytes());
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
log.rollWriter();
@ -778,7 +762,7 @@ public class TestHLog {
}
private void addEdits(HLog log, HRegionInfo hri, TableName tableName,
int times) throws IOException {
int times, AtomicLong sequenceId) throws IOException {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor("row"));
@ -787,7 +771,7 @@ public class TestHLog {
long timestamp = System.currentTimeMillis();
WALEdit cols = new WALEdit();
cols.add(new KeyValue(row, row, row, timestamp, row));
log.append(hri, tableName, cols, timestamp, htd);
log.append(hri, tableName, cols, timestamp, htd, sequenceId);
}
}
@ -949,6 +933,267 @@ public class TestHLog {
}
}
/**
* tests the log comparator. Ensure that we are not mixing meta logs with non-meta logs (throws
* exception if we do). Comparison is based on the timestamp present in the wal name.
* @throws Exception
*/
@Test
public void testHLogComparator() throws Exception {
HLog hlog1 = null;
HLog hlogMeta = null;
try {
hlog1 = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
LOG.debug("Log obtained is: " + hlog1);
Comparator<Path> comp = ((FSHLog) hlog1).LOG_NAME_COMPARATOR;
Path p1 = ((FSHLog) hlog1).computeFilename(11);
Path p2 = ((FSHLog) hlog1).computeFilename(12);
// comparing with itself returns 0
assertTrue(comp.compare(p1, p1) == 0);
// comparing with different filenum.
assertTrue(comp.compare(p1, p2) < 0);
hlogMeta = HLogFactory.createMetaHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf,
null, null);
Comparator<Path> compMeta = ((FSHLog) hlogMeta).LOG_NAME_COMPARATOR;
Path p1WithMeta = ((FSHLog) hlogMeta).computeFilename(11);
Path p2WithMeta = ((FSHLog) hlogMeta).computeFilename(12);
assertTrue(compMeta.compare(p1WithMeta, p1WithMeta) == 0);
assertTrue(compMeta.compare(p1WithMeta, p2WithMeta) < 0);
// mixing meta and non-meta logs gives error
boolean ex = false;
try {
comp.compare(p1WithMeta, p2);
} catch (Exception e) {
ex = true;
}
assertTrue("Comparator doesn't complain while checking meta log files", ex);
boolean exMeta = false;
try {
compMeta.compare(p1WithMeta, p2);
} catch (Exception e) {
exMeta = true;
}
assertTrue("Meta comparator doesn't complain while checking log files", exMeta);
} finally {
if (hlog1 != null) hlog1.close();
if (hlogMeta != null) hlogMeta.close();
}
}
/**
* Tests wal archiving by adding data, doing flushing/rolling and checking we archive old logs
* and also don't archive "live logs" (that is, a log with un-flushed entries).
* <p>
* This is what it does:
* It creates two regions, and does a series of inserts along with log rolling.
* Whenever a WAL is rolled, FSHLog checks previous wals for archiving. A wal is eligible for
* archiving if for all the regions which have entries in that wal file, have flushed - past
* their maximum sequence id in that wal file.
* <p>
* @throws IOException
*/
@Test
public void testWALArchiving() throws IOException {
LOG.debug("testWALArchiving");
TableName table1 = TableName.valueOf("t1");
TableName table2 = TableName.valueOf("t2");
HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf), dir.toString(), conf);
try {
assertEquals(0, ((FSHLog) hlog).getNumLogFiles());
HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW);
HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW,
HConstants.EMPTY_END_ROW);
// ensure that we don't split the regions.
hri1.setSplit(false);
hri2.setSplit(false);
// variables to mock region sequenceIds.
final AtomicLong sequenceId1 = new AtomicLong(1);
final AtomicLong sequenceId2 = new AtomicLong(1);
// start with the testing logic: insert a waledit, and roll writer
addEdits(hlog, hri1, table1, 1, sequenceId1);
hlog.rollWriter();
// assert that the wal is rolled
assertEquals(1, ((FSHLog) hlog).getNumLogFiles());
// add edits in the second wal file, and roll writer.
addEdits(hlog, hri1, table1, 1, sequenceId1);
hlog.rollWriter();
// assert that the wal is rolled
assertEquals(2, ((FSHLog) hlog).getNumLogFiles());
// add a waledit to table1, and flush the region.
addEdits(hlog, hri1, table1, 3, sequenceId1);
flushRegion(hlog, hri1.getEncodedNameAsBytes());
// roll log; all old logs should be archived.
hlog.rollWriter();
assertEquals(0, ((FSHLog) hlog).getNumLogFiles());
// add an edit to table2, and roll writer
addEdits(hlog, hri2, table2, 1, sequenceId2);
hlog.rollWriter();
assertEquals(1, ((FSHLog) hlog).getNumLogFiles());
// add edits for table1, and roll writer
addEdits(hlog, hri1, table1, 2, sequenceId1);
hlog.rollWriter();
assertEquals(2, ((FSHLog) hlog).getNumLogFiles());
// add edits for table2, and flush hri1.
addEdits(hlog, hri2, table2, 2, sequenceId2);
flushRegion(hlog, hri1.getEncodedNameAsBytes());
// the log : region-sequenceId map is
// log1: region2 (unflushed)
// log2: region1 (flushed)
// log3: region2 (unflushed)
// roll the writer; log2 should be archived.
hlog.rollWriter();
assertEquals(2, ((FSHLog) hlog).getNumLogFiles());
// flush region2, and all logs should be archived.
addEdits(hlog, hri2, table2, 2, sequenceId2);
flushRegion(hlog, hri2.getEncodedNameAsBytes());
hlog.rollWriter();
assertEquals(0, ((FSHLog) hlog).getNumLogFiles());
} finally {
if (hlog != null) hlog.close();
}
}
/**
* On rolling a wal after reaching the threshold, {@link HLog#rollWriter()} returns the list of
* regions which should be flushed in order to archive the oldest wal file.
* <p>
* This method tests this behavior by inserting edits and rolling the wal enough times to reach
* the max number of logs threshold. It checks whether we get the "right regions" for flush on
* rolling the wal.
* @throws Exception
*/
@Test
public void testFindMemStoresEligibleForFlush() throws Exception {
LOG.debug("testFindMemStoresEligibleForFlush");
Configuration conf1 = HBaseConfiguration.create(conf);
conf1.setInt("hbase.regionserver.maxlogs", 1);
HLog hlog = HLogFactory.createHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), conf1);
TableName t1 = TableName.valueOf("t1");
TableName t2 = TableName.valueOf("t2");
HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// variables to mock region sequenceIds
final AtomicLong sequenceId1 = new AtomicLong(1);
final AtomicLong sequenceId2 = new AtomicLong(1);
// add edits and roll the wal
try {
addEdits(hlog, hri1, t1, 2, sequenceId1);
hlog.rollWriter();
// add some more edits and roll the wal. This would reach the log number threshold
addEdits(hlog, hri1, t1, 2, sequenceId1);
hlog.rollWriter();
// with above rollWriter call, the max logs limit is reached.
assertTrue(((FSHLog) hlog).getNumLogFiles() == 2);
// get the regions to flush; since there is only one region in the oldest wal, it should
// return only one region.
byte[][] regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
assertEquals(regionsToFlush.length, 1);
assertEquals(regionsToFlush[0], hri1.getEncodedNameAsBytes());
// insert edits in second region
addEdits(hlog, hri2, t2, 2, sequenceId2);
// get the regions to flush, it should still read region1.
regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
assertEquals(regionsToFlush.length, 1);
assertEquals(regionsToFlush[0], hri1.getEncodedNameAsBytes());
// flush region 1, and roll the wal file. Only last wal which has entries for region1 should
// remain.
flushRegion(hlog, hri1.getEncodedNameAsBytes());
hlog.rollWriter();
// only one wal should remain now (that is for the second region).
assertEquals(1, ((FSHLog) hlog).getNumLogFiles());
// flush the second region
flushRegion(hlog, hri2.getEncodedNameAsBytes());
hlog.rollWriter(true);
// no wal should remain now.
assertEquals(0, ((FSHLog) hlog).getNumLogFiles());
// add edits both to region 1 and region 2, and roll.
addEdits(hlog, hri1, t1, 2, sequenceId1);
addEdits(hlog, hri2, t2, 2, sequenceId2);
hlog.rollWriter();
// add edits and roll the writer, to reach the max logs limit.
assertEquals(1, ((FSHLog) hlog).getNumLogFiles());
addEdits(hlog, hri1, t1, 2, sequenceId1);
hlog.rollWriter();
// it should return two regions to flush, as the oldest wal file has entries
// for both regions.
regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush();
assertEquals(regionsToFlush.length, 2);
// flush both regions
flushRegion(hlog, hri1.getEncodedNameAsBytes());
flushRegion(hlog, hri2.getEncodedNameAsBytes());
hlog.rollWriter(true);
assertEquals(0, ((FSHLog) hlog).getNumLogFiles());
// Add an edit to region1, and roll the wal.
addEdits(hlog, hri1, t1, 2, sequenceId1);
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
hlog.startCacheFlush(hri1.getEncodedNameAsBytes());
hlog.rollWriter();
hlog.completeCacheFlush(hri1.getEncodedNameAsBytes());
assertEquals(1, ((FSHLog) hlog).getNumLogFiles());
} finally {
if (hlog != null) hlog.close();
}
}
/**
* Simulates HLog append ops for a region and tests
* {@link FSHLog#areAllRegionsFlushed(Map, Map, Map)} API.
* It compares the region sequenceIds with oldestFlushing and oldestUnFlushed entries.
* If a region's entries are larger than min of (oldestFlushing, oldestUnFlushed), then the
* region should be flushed before archiving this WAL.
*/
@Test
public void testAllRegionsFlushed() {
Map<byte[], Long> oldestFlushingSeqNo = new HashMap<byte[], Long>();
Map<byte[], Long> oldestUnFlushedSeqNo = new HashMap<byte[], Long>();
Map<byte[], Long> seqNo = new HashMap<byte[], Long>();
// create a table
TableName t1 = TableName.valueOf("t1");
// create a region
HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
// variables to mock region sequenceIds
final AtomicLong sequenceId1 = new AtomicLong(1);
// test empty map
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// add entries in the region
seqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.incrementAndGet());
oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
// should say region1 is not flushed.
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// test with entries in oldestFlushing map.
oldestUnFlushedSeqNo.clear();
oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), sequenceId1.get());
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// simulate region flush, i.e., clear oldestFlushing and oldestUnflushed maps
oldestFlushingSeqNo.clear();
oldestUnFlushedSeqNo.clear();
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// insert some large values for region1
oldestUnFlushedSeqNo.put(hri1.getEncodedNameAsBytes(), 1000l);
seqNo.put(hri1.getEncodedNameAsBytes(), 1500l);
assertFalse(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
// tests when oldestUnFlushed/oldestFlushing contains larger value.
// It means region is flushed.
oldestFlushingSeqNo.put(hri1.getEncodedNameAsBytes(), 1200l);
oldestUnFlushedSeqNo.clear();
seqNo.put(hri1.getEncodedNameAsBytes(), 1199l);
assertTrue(FSHLog.areAllRegionsFlushed(seqNo, oldestFlushingSeqNo, oldestUnFlushedSeqNo));
}
/**
* helper method to simulate region flush for a WAL.
* @param hlog
* @param regionEncodedName
*/
private void flushRegion(HLog hlog, byte[] regionEncodedName) {
hlog.startCacheFlush(regionEncodedName);
hlog.completeCacheFlush(regionEncodedName);
}
static class DumbWALActionsListener implements WALActionsListener {
int increments = 0;

View File

@ -1089,6 +1089,7 @@ public class TestHLogSplit {
HRegionInfo regioninfo = new HRegionInfo(tableName,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
log = HLogFactory.createHLog(fs, HBASEDIR, logName, conf);
final AtomicLong sequenceId = new AtomicLong(1);
final int total = 20;
for (int i = 0; i < total; i++) {
@ -1096,7 +1097,7 @@ public class TestHLogSplit {
kvs.add(new KeyValue(Bytes.toBytes(i), tableName.getName(), tableName.getName()));
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor("column"));
log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd);
log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId);
}
// Send the data to HDFS datanodes and close the HDFS writer
log.sync();

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -110,6 +111,7 @@ public class TestLogRollingNoCluster {
@Override
public void run() {
this.log.info(getName() +" started");
final AtomicLong sequenceId = new AtomicLong(1);
try {
for (int i = 0; i < this.count; i++) {
long now = System.currentTimeMillis();
@ -123,7 +125,7 @@ public class TestLogRollingNoCluster {
this.wal.append(HRegionInfo.FIRST_META_REGIONINFO,
HTableDescriptor.META_TABLEDESC.getTableName(),
edit, now, HTableDescriptor.META_TABLEDESC);
edit, now, HTableDescriptor.META_TABLEDESC, sequenceId);
}
String msg = getName() + " finished";
if (isException())

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -89,6 +90,7 @@ public class TestWALActionsListener {
DummyWALActionsListener laterobserver = new DummyWALActionsListener();
HLog hlog = HLogFactory.createHLog(fs, TEST_UTIL.getDataTestDir(), logName,
conf, list, null);
final AtomicLong sequenceId = new AtomicLong(1);
HRegionInfo hri = new HRegionInfo(TableName.valueOf(SOME_BYTES),
SOME_BYTES, SOME_BYTES, false);
@ -100,7 +102,7 @@ public class TestWALActionsListener {
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(b));
hlog.append(hri, TableName.valueOf(b), edit, 0, htd);
hlog.append(hri, TableName.valueOf(b), edit, 0, htd, sequenceId);
if (i == 10) {
hlog.registerWALActionsListener(laterobserver);
}

View File

@ -270,31 +270,33 @@ public class TestWALReplay {
HLog wal1 = createWAL(this.conf);
// Add 1k to each family.
final int countPerFamily = 1000;
final AtomicLong sequenceId = new AtomicLong(1);
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee,
wal1, htd);
wal1, htd, sequenceId);
}
wal1.close();
runWALSplit(this.conf);
HLog wal2 = createWAL(this.conf);
// Up the sequenceid so that these edits are after the ones added above.
wal2.setSequenceNumber(wal1.getSequenceNumber());
// Add 1k to each family.
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
ee, wal2, htd);
ee, wal2, htd, sequenceId);
}
wal2.close();
runWALSplit(this.conf);
HLog wal3 = createWAL(this.conf);
wal3.setSequenceNumber(wal2.getSequenceNumber());
try {
long wal3SeqId = wal3.getSequenceNumber();
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3);
long seqid = region.getOpenSeqNum();
assertTrue(seqid > wal3SeqId);
// The regions opens with sequenceId as 1. With 6k edits, its sequence number reaches 6k + 1.
// When opened, this region would apply 6k edits, and increment the sequenceId by 1
assertTrue(seqid > sequenceId.get());
assertEquals(seqid - 1, sequenceId.get());
LOG.debug("region.getOpenSeqNum(): " + region.getOpenSeqNum() + ", wal3.id: "
+ sequenceId.get());
// TODO: Scan all.
region.close();
@ -395,8 +397,6 @@ public class TestWALReplay {
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
long seqid = region.getOpenSeqNum();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal.setSequenceNumber(seqid);
boolean first = true;
for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
@ -420,8 +420,6 @@ public class TestWALReplay {
HLog wal2 = createWAL(this.conf);
HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2);
long seqid2 = region2.getOpenSeqNum();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal2.setSequenceNumber(seqid2);
assertTrue(seqid + result.size() < seqid2);
final Result result1b = region2.get(g);
assertEquals(result.size(), result1b.size());
@ -458,8 +456,6 @@ public class TestWALReplay {
}
};
long seqid3 = region3.initialize();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal3.setSequenceNumber(seqid3);
Result result3 = region3.get(g);
// Assert that count of cells is same as before crash.
assertEquals(result2.size(), result3.size());
@ -513,8 +509,6 @@ public class TestWALReplay {
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
long seqid = region.getOpenSeqNum();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal.setSequenceNumber(seqid);
for (HColumnDescriptor hcd: htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
}
@ -548,8 +542,6 @@ public class TestWALReplay {
HLog wal2 = createWAL(this.conf);
HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2);
long seqid2 = region2.getOpenSeqNum();
// HRegionServer usually does this. It knows the largest seqid across all regions.
wal2.setSequenceNumber(seqid2);
assertTrue(seqid + result.size() < seqid2);
final Result result1b = region2.get(g);
@ -605,12 +597,8 @@ public class TestWALReplay {
Configuration customConf = new Configuration(this.conf);
customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY,
CustomStoreFlusher.class.getName());
HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices);
long seqid = region.initialize();
// HRegionServer usually does this. It knows the largest seqid across all
// regions.
wal.setSequenceNumber(seqid);
HRegion region =
HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal, customConf, rsServices, null);
int writtenRowCount = 10;
List<HColumnDescriptor> families = new ArrayList<HColumnDescriptor>(
htd.getFamilies());
@ -661,13 +649,8 @@ public class TestWALReplay {
runWALSplit(this.conf);
HLog wal2 = createWAL(this.conf);
Mockito.doReturn(false).when(rsServices).isAborted();
HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd,
rsServices);
long seqid2 = region2.initialize();
// HRegionServer usually does this. It knows the largest seqid across all
// regions.
wal2.setSequenceNumber(seqid2);
HRegion region2 =
HRegion.openHRegion(this.hbaseRootDir, hri, htd, wal2, this.conf, rsServices, null);
scanner = region2.getScanner(new Scan());
assertEquals(writtenRowCount, getScannedCount(scanner));
}
@ -706,12 +689,13 @@ public class TestWALReplay {
final HLog wal = createWAL(this.conf);
final byte[] rowName = tableName.getName();
final byte[] regionName = hri.getEncodedNameAsBytes();
final AtomicLong sequenceId = new AtomicLong(1);
// Add 1k to each family.
final int countPerFamily = 1000;
for (HColumnDescriptor hcd: htd.getFamilies()) {
addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily,
ee, wal, htd);
ee, wal, htd, sequenceId);
}
// Add a cache flush, shouldn't have any effect
@ -723,14 +707,14 @@ public class TestWALReplay {
long now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName,
now, rowName));
wal.append(hri, tableName, edit, now, htd);
wal.append(hri, tableName, edit, now, htd, sequenceId);
// Delete the c family to verify deletes make it over.
edit = new WALEdit();
now = ee.currentTimeMillis();
edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now,
KeyValue.Type.DeleteFamily));
wal.append(hri, tableName, edit, now, htd);
wal.append(hri, tableName, edit, now, htd, sequenceId);
// Sync.
wal.sync();
@ -767,7 +751,7 @@ public class TestWALReplay {
long seqid = region.initialize();
// We flushed during init.
assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0);
assertTrue(seqid > wal.getSequenceNumber());
assertTrue(seqid - 1 == sequenceId.get());
Get get = new Get(rowName);
Result result = region.get(get);
@ -800,15 +784,9 @@ public class TestWALReplay {
MockHLog wal = createMockWAL(this.conf);
HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal);
long seqid = region.getOpenSeqNum();
// HRegionServer usually does this. It knows the largest seqid across all
// regions.
wal.setSequenceNumber(seqid);
for (HColumnDescriptor hcd : htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x");
}
// get the seq no after first set of entries.
long sequenceNumber = wal.getSequenceNumber();
// Let us flush the region
// But this time completeflushcache is not yet done
@ -816,7 +794,7 @@ public class TestWALReplay {
for (HColumnDescriptor hcd : htd.getFamilies()) {
addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x");
}
long lastestSeqNumber = wal.getSequenceNumber();
long lastestSeqNumber = region.getSequenceId().get();
// get the current seq no
wal.doCompleteCacheFlush = true;
// allow complete cache flush with the previous seq number got after first
@ -891,9 +869,9 @@ public class TestWALReplay {
}
}
private void addWALEdits (final TableName tableName, final HRegionInfo hri,
final byte [] rowName, final byte [] family,
final int count, EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd)
private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName,
final byte[] family, final int count, EnvironmentEdge ee, final HLog wal,
final HTableDescriptor htd, final AtomicLong sequenceId)
throws IOException {
String familyStr = Bytes.toString(family);
for (int j = 0; j < count; j++) {
@ -902,7 +880,7 @@ public class TestWALReplay {
WALEdit edit = new WALEdit();
edit.add(new KeyValue(rowName, family, qualifierBytes,
ee.currentTimeMillis(), columnBytes));
wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd);
wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, sequenceId);
}
}

View File

@ -52,6 +52,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@Category(LargeTests.class)
@RunWith(Parameterized.class)
@ -74,6 +75,7 @@ public class TestReplicationHLogReaderManager {
private PathWatcher pathWatcher;
private int nbRows;
private int walEditKVs;
private final AtomicLong sequenceId = new AtomicLong(1);
@Parameters
public static Collection<Object[]> parameters() {
@ -206,7 +208,7 @@ public class TestReplicationHLogReaderManager {
}
private void appendToLogPlus(int count) throws IOException {
log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd);
log.append(info, tableName, getWALEdits(count), System.currentTimeMillis(), htd, sequenceId);
}
private WALEdit getWALEdits(int count) {

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -188,7 +189,7 @@ public class TestReplicationSourceManager {
listeners.add(replication);
HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName,
conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"));
final AtomicLong sequenceId = new AtomicLong(1);
manager.init();
HTableDescriptor htd = new HTableDescriptor();
htd.addFamily(new HColumnDescriptor(f1));
@ -200,7 +201,7 @@ public class TestReplicationSourceManager {
LOG.info(i);
HLogKey key = new HLogKey(hri.getRegionName(), test, seq++,
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
}
// Simulate a rapid insert that's followed
@ -211,7 +212,7 @@ public class TestReplicationSourceManager {
LOG.info(baseline + " and " + time);
for (int i = 0; i < 3; i++) {
hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
}
assertEquals(6, manager.getHLogs().get(slaveId).size());
@ -221,7 +222,7 @@ public class TestReplicationSourceManager {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
hlog.append(hri, test, edit, System.currentTimeMillis(), htd);
hlog.append(hri, test, edit, System.currentTimeMillis(), htd, sequenceId);
assertEquals(1, manager.getHLogs().size());