HBASE-11135 Change region sequenceid generation so happens earlier in the append cycle rather than just before added to file
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1594345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
16025a0a2f
commit
3fde3c238f
|
@ -24,6 +24,7 @@ import java.security.Key;
|
|||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.spec.InvalidKeySpecException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
@ -235,7 +236,7 @@ public final class Encryption {
|
|||
Bytes.random(salt);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (byte[] b: args) {
|
||||
sb.append(b);
|
||||
sb.append(Arrays.toString(b));
|
||||
}
|
||||
PBEKeySpec spec = new PBEKeySpec(sb.toString().toCharArray(), salt, 10000, 128);
|
||||
try {
|
||||
|
|
|
@ -36,7 +36,6 @@ import java.util.NavigableMap;
|
|||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -209,12 +208,23 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
*/
|
||||
final AtomicBoolean closing = new AtomicBoolean(false);
|
||||
|
||||
protected volatile long completeSequenceId = -1L;
|
||||
/**
|
||||
* The sequence id of the last flush on this region. Used doing some rough calculations on
|
||||
* whether time to flush or not.
|
||||
*/
|
||||
protected volatile long lastFlushSeqId = -1L;
|
||||
|
||||
/**
|
||||
* Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1,
|
||||
* as a marker that the region hasn't opened yet. Once it is opened, it is set to
|
||||
* {@link #openSeqNum}.
|
||||
* Region scoped edit sequence Id. Edits to this region are GUARANTEED to appear in the WAL/HLog
|
||||
* file in this sequence id's order; i.e. edit #2 will be in the WAL after edit #1.
|
||||
* Its default value is {@link HLog.NO_SEQUENCE_ID}. This default is used as a marker to indicate
|
||||
* that the region hasn't opened yet. Once it is opened, it is set to the derived
|
||||
* {@link #openSeqNum}, the largest sequence id of all hfiles opened under this Region.
|
||||
*
|
||||
* <p>Control of this sequence is handed off to the WAL/HLog implementation. It is responsible
|
||||
* for tagging edits with the correct sequence id since it is responsible for getting the
|
||||
* edits into the WAL files. It controls updating the sequence id value. DO NOT UPDATE IT
|
||||
* OUTSIDE OF THE WAL. The value you get will not be what you think it is.
|
||||
*/
|
||||
private final AtomicLong sequenceId = new AtomicLong(-1L);
|
||||
|
||||
|
@ -391,7 +401,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
/**
|
||||
* Objects from this class are created when flushing to describe all the different states that
|
||||
* that method ends up in. The Result enum describes those states. The sequence id should only
|
||||
* be specified if the flush was successful, and the failure message should only be speficied
|
||||
* be specified if the flush was successful, and the failure message should only be specified
|
||||
* if it didn't flush.
|
||||
*/
|
||||
public static class FlushResult {
|
||||
|
@ -742,7 +752,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
this.closing.set(false);
|
||||
this.closed.set(false);
|
||||
|
||||
this.completeSequenceId = nextSeqid;
|
||||
this.lastFlushSeqId = nextSeqid;
|
||||
if (coprocessorHost != null) {
|
||||
status.setStatus("Running coprocessor post-open hooks");
|
||||
coprocessorHost.postOpen();
|
||||
|
@ -1603,7 +1613,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
* Should the memstore be flushed now
|
||||
*/
|
||||
boolean shouldFlush() {
|
||||
if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) {
|
||||
// This is a rough measure.
|
||||
if (this.lastFlushSeqId + this.flushPerChanges < this.sequenceId.get()) {
|
||||
return true;
|
||||
}
|
||||
if (flushCheckInterval <= 0) { //disabled
|
||||
|
@ -1626,34 +1637,16 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* Flush the memstore.
|
||||
*
|
||||
* Flushing the memstore is a little tricky. We have a lot of updates in the
|
||||
* memstore, all of which have also been written to the log. We need to
|
||||
* write those updates in the memstore out to disk, while being able to
|
||||
* process reads/writes as much as possible during the flush operation. Also,
|
||||
* the log has to state clearly the point in time at which the memstore was
|
||||
* flushed. (That way, during recovery, we know when we can rely on the
|
||||
* on-disk flushed structures and when we have to recover the memstore from
|
||||
* the log.)
|
||||
*
|
||||
* <p>So, we have a three-step process:
|
||||
*
|
||||
* <ul><li>A. Flush the memstore to the on-disk stores, noting the current
|
||||
* sequence ID for the log.<li>
|
||||
*
|
||||
* <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence
|
||||
* ID that was current at the time of memstore-flush.</li>
|
||||
*
|
||||
* <li>C. Get rid of the memstore structures that are now redundant, as
|
||||
* they've been flushed to the on-disk HStores.</li>
|
||||
* </ul>
|
||||
* <p>This method is protected, but can be accessed via several public
|
||||
* routes.
|
||||
*
|
||||
* <p> This method may block for some time.
|
||||
* Flush the memstore. Flushing the memstore is a little tricky. We have a lot of updates in the
|
||||
* memstore, all of which have also been written to the log. We need to write those updates in the
|
||||
* memstore out to disk, while being able to process reads/writes as much as possible during the
|
||||
* flush operation.
|
||||
* <p>This method may block for some time. Every time you call it, we up the regions
|
||||
* sequence id even if we don't flush; i.e. the returned region id will be at least one larger
|
||||
* than the last edit applied to this region. The returned id does not refer to an actual edit.
|
||||
* The returned id can be used for say installing a bulk loaded file just ahead of the last hfile
|
||||
* that was the result of this flush, etc.
|
||||
* @param status
|
||||
*
|
||||
* @return object describing the flush's state
|
||||
*
|
||||
* @throws IOException general io exceptions
|
||||
|
@ -1667,10 +1660,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
/**
|
||||
* @param wal Null if we're NOT to go via hlog/wal.
|
||||
* @param myseqid The seqid to use if <code>wal</code> is null writing out
|
||||
* flush file.
|
||||
* @param myseqid The seqid to use if <code>wal</code> is null writing out flush file.
|
||||
* @param status
|
||||
* @return true if the region needs compacting
|
||||
* @return object describing the flush's state
|
||||
* @throws IOException
|
||||
* @see #internalFlushcache(MonitoredTask)
|
||||
*/
|
||||
|
@ -1682,50 +1674,67 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throw new IOException("Aborting flush because server is abortted...");
|
||||
}
|
||||
final long startTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
// Clear flush flag.
|
||||
// If nothing to flush, return and avoid logging start/stop flush.
|
||||
// If nothing to flush, return, but we need to safely update the region sequence id
|
||||
if (this.memstoreSize.get() <= 0) {
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
|
||||
// Take an update lock because am about to change the sequence id and we want the sequence id
|
||||
// to be at the border of the empty memstore.
|
||||
this.updatesLock.writeLock().lock();
|
||||
try {
|
||||
if (this.memstoreSize.get() <= 0) {
|
||||
// Presume that if there are still no edits in the memstore, then there are no edits for
|
||||
// this region out in the WAL/HLog subsystem so no need to do any trickery clearing out
|
||||
// edits in the WAL system. Up the sequence number so the resulting flush id is for
|
||||
// sure just beyond the last appended region edit (useful as a marker when bulk loading,
|
||||
// etc.)
|
||||
// wal can be null replaying edits.
|
||||
return wal != null?
|
||||
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
|
||||
getNextSequenceId(wal, startTime), "Nothing to flush"):
|
||||
new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
|
||||
}
|
||||
} finally {
|
||||
this.updatesLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started memstore flush for " + this +
|
||||
", current region memstore size " +
|
||||
StringUtils.humanReadableInt(this.memstoreSize.get()) +
|
||||
StringUtils.byteDesc(this.memstoreSize.get()) +
|
||||
((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid));
|
||||
}
|
||||
|
||||
// Stop updates while we snapshot the memstore of all stores. We only have
|
||||
// to do this for a moment. Its quick. The subsequent sequence id that
|
||||
// goes into the HLog after we've flushed all these snapshots also goes
|
||||
// into the info file that sits beside the flushed files.
|
||||
// We also set the memstore size to zero here before we allow updates
|
||||
// again so its value will represent the size of the updates received
|
||||
// during the flush
|
||||
// Stop updates while we snapshot the memstore of all of these regions' stores. We only have
|
||||
// to do this for a moment. It is quick. We also set the memstore size to zero here before we
|
||||
// allow updates again so its value will represent the size of the updates received
|
||||
// during flush
|
||||
MultiVersionConsistencyControl.WriteEntry w = null;
|
||||
|
||||
// We have to take a write lock during snapshot, or else a write could
|
||||
// end up in both snapshot and memstore (makes it difficult to do atomic
|
||||
// rows then)
|
||||
// We have to take an update lock during snapshot, or else a write could end up in both snapshot
|
||||
// and memstore (makes it difficult to do atomic rows then)
|
||||
status.setStatus("Obtaining lock to block concurrent updates");
|
||||
// block waiting for the lock for internal flush
|
||||
this.updatesLock.writeLock().lock();
|
||||
long totalFlushableSize = 0;
|
||||
status.setStatus("Preparing to flush by snapshotting stores");
|
||||
status.setStatus("Preparing to flush by snapshotting stores in " +
|
||||
getRegionInfo().getEncodedName());
|
||||
List<StoreFlushContext> storeFlushCtxs = new ArrayList<StoreFlushContext>(stores.size());
|
||||
long flushSeqId = -1L;
|
||||
try {
|
||||
// Record the mvcc for all transactions in progress.
|
||||
w = mvcc.beginMemstoreInsert();
|
||||
mvcc.advanceMemstore(w);
|
||||
// check if it is not closing.
|
||||
if (wal != null) {
|
||||
if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
|
||||
// This should never happen.
|
||||
String msg = "Flush will not be started for ["
|
||||
+ this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
|
||||
status.setStatus(msg);
|
||||
return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
|
||||
}
|
||||
flushSeqId = this.sequenceId.incrementAndGet();
|
||||
// Get a sequence id that we can use to denote the flush. It will be one beyond the last
|
||||
// edit that made it into the hfile (the below does not add an edit, it just asks the
|
||||
// WAL system to return next sequence edit).
|
||||
flushSeqId = getNextSequenceId(wal, startTime);
|
||||
} else {
|
||||
// use the provided sequence Id as WAL is not being used for this flush.
|
||||
flushSeqId = myseqid;
|
||||
|
@ -1736,7 +1745,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
storeFlushCtxs.add(s.createFlushContext(flushSeqId));
|
||||
}
|
||||
|
||||
// prepare flush (take a snapshot)
|
||||
// Prepare flush (take a snapshot)
|
||||
for (StoreFlushContext flush : storeFlushCtxs) {
|
||||
flush.prepare();
|
||||
}
|
||||
|
@ -1750,9 +1759,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
|
||||
// sync unflushed WAL changes when deferred log sync is enabled
|
||||
// see HBASE-8208 for details
|
||||
if (wal != null && !shouldSyncLog()) {
|
||||
wal.sync();
|
||||
}
|
||||
if (wal != null && !shouldSyncLog()) wal.sync();
|
||||
|
||||
// wait for all in-progress transactions to commit to HLog before
|
||||
// we can start the flush. This prevents
|
||||
|
@ -1817,8 +1824,8 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Record latest flush time
|
||||
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||
|
||||
// Update the last flushed sequence id for region
|
||||
completeSequenceId = flushSeqId;
|
||||
// Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog.
|
||||
this.lastFlushSeqId = flushSeqId;
|
||||
|
||||
// C. Finally notify anyone waiting on memstore to clear:
|
||||
// e.g. checkResources().
|
||||
|
@ -1829,9 +1836,9 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long time = EnvironmentEdgeManager.currentTimeMillis() - startTime;
|
||||
long memstoresize = this.memstoreSize.get();
|
||||
String msg = "Finished memstore flush of ~" +
|
||||
StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize +
|
||||
StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize +
|
||||
", currentsize=" +
|
||||
StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize +
|
||||
StringUtils.byteDesc(memstoresize) + "/" + memstoresize +
|
||||
" for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId +
|
||||
", compaction requested=" + compactionRequested +
|
||||
((wal == null)? "; wal=null": "");
|
||||
|
@ -1843,6 +1850,22 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to safely get the next sequence number.
|
||||
* @param wal
|
||||
* @param now
|
||||
* @return Next sequence number unassociated with any actual edit.
|
||||
* @throws IOException
|
||||
*/
|
||||
private long getNextSequenceId(final HLog wal, final long now) throws IOException {
|
||||
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), getRegionInfo().getTable());
|
||||
// Call append but with an empty WALEdit. The returned seqeunce id will not be associated
|
||||
// with any edit and we can be sure it went in after all outstanding appends.
|
||||
wal.appendNoSync(getTableDesc(), getRegionInfo(), key,
|
||||
WALEdit.EMPTY_WALEDIT, this.sequenceId, false);
|
||||
return key.getLogSeqNum();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// get() methods for client use.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -2516,9 +2539,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throw new IOException("Multiple nonces per batch and not in replay");
|
||||
}
|
||||
// txid should always increase, so having the one from the last call is ok.
|
||||
txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(),
|
||||
walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true,
|
||||
currentNonceGroup, currentNonce);
|
||||
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), now, m.getClusterIds(), currentNonceGroup,
|
||||
currentNonce);
|
||||
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key,
|
||||
walEdit, getSequenceId(), true);
|
||||
hasWalAppends = true;
|
||||
walEdit = new WALEdit(isInReplay);
|
||||
}
|
||||
|
@ -2541,9 +2566,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// -------------------------
|
||||
Mutation mutation = batchOp.getMutation(firstIndex);
|
||||
if (walEdit.size() > 0) {
|
||||
txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(),
|
||||
walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId,
|
||||
true, currentNonceGroup, currentNonce);
|
||||
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
|
||||
currentNonceGroup, currentNonce);
|
||||
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), key, walEdit,
|
||||
getSequenceId(), true);
|
||||
hasWalAppends = true;
|
||||
}
|
||||
|
||||
|
@ -3597,13 +3624,15 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long seqId = -1;
|
||||
// We need to assign a sequential ID that's in between two memstores in order to preserve
|
||||
// the guarantee that all the edits lower than the highest sequential ID from all the
|
||||
// HFiles are flushed on disk. See HBASE-10958.
|
||||
// HFiles are flushed on disk. See HBASE-10958. The sequence id returned when we flush is
|
||||
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
|
||||
// a sequence id that we can be sure is beyond the last hfile written).
|
||||
if (assignSeqId) {
|
||||
FlushResult fs = this.flushcache();
|
||||
if (fs.isFlushSucceeded()) {
|
||||
seqId = fs.flushSequenceId;
|
||||
} else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
|
||||
seqId = this.sequenceId.incrementAndGet();
|
||||
seqId = fs.flushSequenceId;
|
||||
} else {
|
||||
throw new IOException("Could not bulk load with an assigned sequential ID because the " +
|
||||
"flush didn't run. Reason for not flushing: " + fs.failureReason);
|
||||
|
@ -4890,9 +4919,11 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long txid = 0;
|
||||
// 7. Append no sync
|
||||
if (!walEdit.isEmpty()) {
|
||||
txid = this.log.appendNoSync(this.getRegionInfo(),
|
||||
this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now,
|
||||
this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce);
|
||||
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), now, processor.getClusterIds(), nonceGroup,
|
||||
nonce);
|
||||
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
|
||||
key, walEdit, getSequenceId(), true);
|
||||
}
|
||||
// 8. Release region lock
|
||||
if (locked) {
|
||||
|
@ -5133,10 +5164,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Using default cluster id, as this can only happen in the orginating
|
||||
// cluster. A slave cluster receives the final value (not the delta)
|
||||
// as a Put.
|
||||
txid = this.log.appendNoSync(this.getRegionInfo(),
|
||||
this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
|
||||
EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
|
||||
true, nonceGroup, nonce);
|
||||
HLogKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), nonceGroup, nonce);
|
||||
txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), key, walEdits,
|
||||
this.sequenceId, true);
|
||||
} else {
|
||||
recordMutationWithoutWal(append.getFamilyCellMap());
|
||||
}
|
||||
|
@ -5326,10 +5357,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
// Using default cluster id, as this can only happen in the orginating
|
||||
// cluster. A slave cluster receives the final value (not the delta)
|
||||
// as a Put.
|
||||
txid = this.log.appendNoSync(this.getRegionInfo(),
|
||||
this.htableDescriptor.getTableName(), walEdits, new ArrayList<UUID>(),
|
||||
EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId,
|
||||
true, nonceGroup, nonce);
|
||||
HLogKey key = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
|
||||
this.htableDescriptor.getTableName(), nonceGroup, nonce);
|
||||
txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(),
|
||||
key, walEdits, getSequenceId(), true);
|
||||
} else {
|
||||
recordMutationWithoutWal(increment.getFamilyCellMap());
|
||||
}
|
||||
|
@ -6002,8 +6033,10 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
}
|
||||
|
||||
/**
|
||||
* @return sequenceId.
|
||||
* Do not change this sequence id. See {@link #sequenceId} comment.
|
||||
* @return sequenceId
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public AtomicLong getSequenceId() {
|
||||
return this.sequenceId;
|
||||
}
|
||||
|
|
|
@ -1241,7 +1241,7 @@ public class HRegionServer extends HasThread implements
|
|||
.setWriteRequestsCount((int) r.writeRequestsCount.get())
|
||||
.setTotalCompactingKVs(totalCompactingKVs)
|
||||
.setCurrentCompactedKVs(currentCompactedKVs)
|
||||
.setCompleteSequenceId(r.completeSequenceId);
|
||||
.setCompleteSequenceId(r.lastFlushSeqId);
|
||||
|
||||
return regionLoad.build();
|
||||
}
|
||||
|
|
|
@ -87,19 +87,16 @@ import com.lmax.disruptor.dsl.ProducerType;
|
|||
/**
|
||||
* Implementation of {@link HLog} to go against {@link FileSystem}; i.e. keep WALs in HDFS.
|
||||
* Only one HLog/WAL is ever being written at a time. When a WAL hits a configured maximum size,
|
||||
* it is rolled. This is done internal to the implementation, so external
|
||||
* callers do not have to be concerned with log rolling.
|
||||
* it is rolled. This is done internal to the implementation.
|
||||
*
|
||||
* <p>As data is flushed from the MemStore to other (better) on-disk structures (files sorted by
|
||||
* <p>As data is flushed from the MemStore to other on-disk structures (files sorted by
|
||||
* key, hfiles), a WAL becomes obsolete. We can let go of all the log edits/entries for a given
|
||||
* HRegion-id up to the most-recent CACHEFLUSH message from that HRegion. A bunch of work in the
|
||||
* below is done keeping account of these region sequence ids -- what is flushed out to hfiles,
|
||||
* and what is yet in WAL and in memory only.
|
||||
* HRegion-sequence id. A bunch of work in the below is done keeping account of these region
|
||||
* sequence ids -- what is flushed out to hfiles, and what is yet in WAL and in memory only.
|
||||
*
|
||||
* <p>It is only practical to delete entire files. Thus, we delete an entire on-disk file
|
||||
* <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older
|
||||
* (smaller) than the most-recent CACHEFLUSH message for every HRegion that has an edit in
|
||||
* <code>F</code>.
|
||||
* (smaller) than the most-recent flush.
|
||||
*
|
||||
* <p>To read an HLog, call {@link HLogFactory#createReader(org.apache.hadoop.fs.FileSystem,
|
||||
* org.apache.hadoop.fs.Path, org.apache.hadoop.conf.Configuration)}.
|
||||
|
@ -113,24 +110,29 @@ class FSHLog implements HLog, Syncable {
|
|||
// here appending and syncing on a single WAL. The Disruptor is configured to handle multiple
|
||||
// producers but it has one consumer only (the producers in HBase are IPC Handlers calling append
|
||||
// and then sync). The single consumer/writer pulls the appends and syncs off the ring buffer.
|
||||
// The appends are added to the WAL immediately without pause or batching (there may be a slight
|
||||
// benefit batching appends but it complicates the implementation -- the gain is not worth
|
||||
// the added complication). When a producer calls sync, it is given back a future. The producer
|
||||
// 'blocks' on the future so it does not return until the sync completes. The future is passed
|
||||
// over the ring buffer from the producer to the consumer thread where it does its best to batch
|
||||
// up the producer syncs so one WAL sync actually spans multiple producer sync invocations. How
|
||||
// well the batching works depends on the write rate; i.e. we tend to batch more in times of
|
||||
// When a handler calls sync, it is given back a future. The producer 'blocks' on the future so
|
||||
// it does not return until the sync completes. The future is passed over the ring buffer from
|
||||
// the producer/handler to the consumer thread where it does its best to batch up the producer
|
||||
// syncs so one WAL sync actually spans multiple producer sync invocations. How well the
|
||||
// batching works depends on the write rate; i.e. we tend to batch more in times of
|
||||
// high writes/syncs.
|
||||
//
|
||||
// <p>The consumer thread pass the syncs off to muliple syncing threads in a round robin fashion
|
||||
// Calls to append now also wait until the append has been done on the consumer side of the
|
||||
// disruptor. We used to not wait but it makes the implemenation easier to grok if we have
|
||||
// the region edit/sequence id after the append returns.
|
||||
//
|
||||
// TODO: Handlers need to coordinate appending AND syncing. Can we have the threads contend
|
||||
// once only? Probably hard given syncs take way longer than an append.
|
||||
//
|
||||
// The consumer threads pass the syncs off to multiple syncing threads in a round robin fashion
|
||||
// to ensure we keep up back-to-back FS sync calls (FS sync calls are the long poll writing the
|
||||
// WAL). The consumer thread passes the futures to the sync threads for it to complete
|
||||
// the futures when done.
|
||||
//
|
||||
// <p>The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It
|
||||
// The 'sequence' in the below is the sequence of the append/sync on the ringbuffer. It
|
||||
// acts as a sort-of transaction id. It is always incrementing.
|
||||
//
|
||||
// <p>The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that
|
||||
// The RingBufferEventHandler class hosts the ring buffer consuming code. The threads that
|
||||
// do the actual FS sync are implementations of SyncRunner. SafePointZigZagLatch is a
|
||||
// synchronization class used to halt the consumer at a safe point -- just after all outstanding
|
||||
// syncs and appends have completed -- so the log roller can swap the WAL out under it.
|
||||
|
@ -138,14 +140,17 @@ class FSHLog implements HLog, Syncable {
|
|||
static final Log LOG = LogFactory.getLog(FSHLog.class);
|
||||
|
||||
/**
|
||||
* Disruptor is a fancy ring buffer. This disruptor/ring buffer is used to take edits and sync
|
||||
* calls from the Handlers and passes them to the append and sync executors with minimal
|
||||
* contention.
|
||||
* The nexus at which all incoming handlers meet. Does appends and sync with an ordering.
|
||||
* Appends and syncs are each put on the ring which means handlers need to
|
||||
* smash up against the ring twice (can we make it once only? ... maybe not since time to append
|
||||
* is so different from time to sync and sometimes we don't want to sync or we want to async
|
||||
* the sync). The ring is where we make sure of our ordering and it is also where we do
|
||||
* batching up of handler sync calls.
|
||||
*/
|
||||
private final Disruptor<RingBufferTruck> disruptor;
|
||||
|
||||
/**
|
||||
* An executorservice that runs the AppendEventHandler append executor.
|
||||
* An executorservice that runs the disrutpor AppendEventHandler append executor.
|
||||
*/
|
||||
private final ExecutorService appendExecutor;
|
||||
|
||||
|
@ -159,6 +164,9 @@ class FSHLog implements HLog, Syncable {
|
|||
|
||||
/**
|
||||
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
|
||||
* TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here.
|
||||
* TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them
|
||||
* get them from this Map?
|
||||
*/
|
||||
private final Map<Thread, SyncFuture> syncFuturesByHandler;
|
||||
|
||||
|
@ -170,14 +178,14 @@ class FSHLog implements HLog, Syncable {
|
|||
|
||||
/**
|
||||
* The highest known outstanding unsync'd WALEdit sequence number where sequence number is the
|
||||
* ring buffer sequence.
|
||||
* ring buffer sequence. Maintained by the ring buffer consumer.
|
||||
*/
|
||||
private volatile long highestUnsyncedSequence = -1;
|
||||
|
||||
/**
|
||||
* Updated to the ring buffer sequence of the last successful sync call. This can be less than
|
||||
* {@link #highestUnsyncedSequence} for case where we have an append where a sync has not yet
|
||||
* come in for it.
|
||||
* come in for it. Maintained by the syncing threads.
|
||||
*/
|
||||
private final AtomicLong highestSyncedSequence = new AtomicLong(0);
|
||||
|
||||
|
@ -192,15 +200,20 @@ class FSHLog implements HLog, Syncable {
|
|||
|
||||
// Minimum tolerable replicas, if the actual value is lower than it, rollWriter will be triggered
|
||||
private final int minTolerableReplication;
|
||||
|
||||
// DFSOutputStream.getNumCurrentReplicas method instance gotten via reflection.
|
||||
private final Method getNumCurrentReplicas;
|
||||
|
||||
private final static Object [] NO_ARGS = new Object []{};
|
||||
|
||||
// If live datanode count is lower than the default replicas value,
|
||||
// RollWriter will be triggered in each sync(So the RollWriter will be
|
||||
// triggered one by one in a short time). Using it as a workaround to slow
|
||||
// down the roll frequency triggered by checkLowReplication().
|
||||
private final AtomicInteger consecutiveLogRolls = new AtomicInteger(0);
|
||||
|
||||
private final int lowReplicationRollLimit;
|
||||
|
||||
// If consecutiveLogRolls is larger than lowReplicationRollLimit,
|
||||
// then disable the rolling in checkLowReplication().
|
||||
// Enable it if the replications recover.
|
||||
|
@ -273,18 +286,17 @@ class FSHLog implements HLog, Syncable {
|
|||
/**
|
||||
* This lock ties all operations on oldestFlushingRegionSequenceIds and
|
||||
* oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into
|
||||
* oldestUnflushedSeqNums. We use these Maps to find out the low bound seqNum, or to find regions
|
||||
* with old seqNums to force flush; we are interested in old stuff not the new additions
|
||||
* (TODO: IS THIS SAFE? CHECK!).
|
||||
* oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or
|
||||
* to find regions with old sequence ids to force flush; we are interested in old stuff not the
|
||||
* new additions (TODO: IS THIS SAFE? CHECK!).
|
||||
*/
|
||||
private final Object regionSequenceIdLock = new Object();
|
||||
|
||||
/**
|
||||
* Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived --
|
||||
* sequence id in memstore. Note that this sequenceid is the region sequence id. This is not
|
||||
* sequence id in memstore. Note that this sequence id is the region sequence id. This is not
|
||||
* related to the id we use above for {@link #highestSyncedSequence} and
|
||||
* {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer, an
|
||||
* internal detail.
|
||||
* {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer.
|
||||
*/
|
||||
private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedRegionSequenceIds =
|
||||
new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
|
||||
|
@ -327,14 +339,14 @@ class FSHLog implements HLog, Syncable {
|
|||
};
|
||||
|
||||
/**
|
||||
* Map of wal log file to the latest sequence nums of all regions it has entries of.
|
||||
* Map of wal log file to the latest sequence ids of all regions it has entries of.
|
||||
* The map is sorted by the log file creation timestamp (contained in the log file name).
|
||||
*/
|
||||
private NavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
|
||||
new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
|
||||
|
||||
/**
|
||||
* Exception handler to pass the disruptor ringbuffer. Same as native implemenation only it
|
||||
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it
|
||||
* logs using our logger instead of java native logger.
|
||||
*/
|
||||
static class RingBufferExceptionHandler implements ExceptionHandler {
|
||||
|
@ -372,48 +384,6 @@ class FSHLog implements HLog, Syncable {
|
|||
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param fs filesystem handle
|
||||
* @param root path for stored and archived hlogs
|
||||
* @param logDir dir where hlogs are stored
|
||||
* @param oldLogDir dir where hlogs are archived
|
||||
* @param conf configuration to use
|
||||
* @throws IOException
|
||||
*/
|
||||
public FSHLog(final FileSystem fs, final Path root, final String logDir, final String oldLogDir,
|
||||
final Configuration conf)
|
||||
throws IOException {
|
||||
this(fs, root, logDir, oldLogDir, conf, null, true, null, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an edit log at the given <code>dir</code> location.
|
||||
*
|
||||
* You should never have to load an existing log. If there is a log at
|
||||
* startup, it should have already been processed and deleted by the time the
|
||||
* HLog object is started up.
|
||||
*
|
||||
* @param fs filesystem handle
|
||||
* @param root path for stored and archived hlogs
|
||||
* @param logDir dir where hlogs are stored
|
||||
* @param conf configuration to use
|
||||
* @param listeners Listeners on WAL events. Listeners passed here will
|
||||
* be registered before we do anything else; e.g. the
|
||||
* Constructor {@link #rollWriter()}.
|
||||
* @param prefix should always be hostname and port in distributed env and
|
||||
* it will be URL encoded before being used.
|
||||
* If prefix is null, "hlog" will be used
|
||||
* @throws IOException
|
||||
*/
|
||||
public FSHLog(final FileSystem fs, final Path root, final String logDir,
|
||||
final Configuration conf, final List<WALActionsListener> listeners,
|
||||
final String prefix) throws IOException {
|
||||
this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, true, prefix,
|
||||
false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an edit log at the given <code>dir</code> location.
|
||||
*
|
||||
|
@ -448,7 +418,7 @@ class FSHLog implements HLog, Syncable {
|
|||
this.forMeta = forMeta;
|
||||
this.conf = conf;
|
||||
|
||||
// Register listeners.
|
||||
// Register listeners. TODO: Should this exist anymore? We have CPs?
|
||||
if (listeners != null) {
|
||||
for (WALActionsListener i: listeners) {
|
||||
registerWALActionsListener(i);
|
||||
|
@ -456,7 +426,7 @@ class FSHLog implements HLog, Syncable {
|
|||
}
|
||||
|
||||
// Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks
|
||||
// (it costs x'ing bocks)
|
||||
// (it costs a little x'ing bocks)
|
||||
long blocksize = this.conf.getLong("hbase.regionserver.hlog.blocksize",
|
||||
FSUtils.getDefaultBlockSize(this.fs, this.fullPathLogDir));
|
||||
this.logrollsize =
|
||||
|
@ -496,7 +466,8 @@ class FSHLog implements HLog, Syncable {
|
|||
// rollWriter sets this.hdfs_out if it can.
|
||||
rollWriter();
|
||||
|
||||
// handle the reflection necessary to call getNumCurrentReplicas()
|
||||
// handle the reflection necessary to call getNumCurrentReplicas(). TODO: Replace with
|
||||
// HdfsDataOutputStream#getCurrentBlockReplication() and go without reflection.
|
||||
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
||||
|
||||
this.coprocessorHost = new WALCoprocessorHost(this, conf);
|
||||
|
@ -537,6 +508,8 @@ class FSHLog implements HLog, Syncable {
|
|||
* @return Method or null.
|
||||
*/
|
||||
private static Method getGetNumCurrentReplicas(final FSDataOutputStream os) {
|
||||
// TODO: Remove all this and use the now publically available
|
||||
// HdfsDataOutputStream#getCurrentBlockReplication()
|
||||
Method m = null;
|
||||
if (os != null) {
|
||||
Class<? extends OutputStream> wrappedStreamClass = os.getWrappedStream().getClass();
|
||||
|
@ -909,7 +882,7 @@ class FSHLog implements HLog, Syncable {
|
|||
long oldFileLen = this.fs.getFileStatus(oldPath).getLen();
|
||||
this.totalLogSize.addAndGet(oldFileLen);
|
||||
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries +
|
||||
", filesize=" + StringUtils.humanReadableInt(oldFileLen) + "; new WAL " +
|
||||
", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " +
|
||||
FSUtils.getPath(newPath));
|
||||
} else {
|
||||
LOG.info("New WAL " + FSUtils.getPath(newPath));
|
||||
|
@ -1091,96 +1064,83 @@ class FSHLog implements HLog, Syncable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param now
|
||||
* @param encodedRegionName Encoded name of the region as returned by
|
||||
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
|
||||
* @param tableName
|
||||
* @param clusterIds that have consumed the change
|
||||
* @return New log key.
|
||||
*/
|
||||
protected HLogKey makeKey(byte[] encodedRegionName, TableName tableName, long seqnum,
|
||||
long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
|
||||
return new HLogKey(encodedRegionName, tableName, seqnum, now, clusterIds, nonceGroup, nonce);
|
||||
}
|
||||
|
||||
@Override
|
||||
@VisibleForTesting
|
||||
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
|
||||
final long now, HTableDescriptor htd, AtomicLong sequenceId)
|
||||
throws IOException {
|
||||
append(info, tableName, edits, new ArrayList<UUID>(), now, htd, true, true, sequenceId,
|
||||
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
HLogKey logKey = new HLogKey(info.getEncodedNameAsBytes(), tableName, now);
|
||||
append(htd, info, logKey, edits, sequenceId, true, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
|
||||
public long appendNoSync(final HRegionInfo info, TableName tableName, WALEdit edits,
|
||||
List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
|
||||
boolean isInMemstore, long nonceGroup, long nonce) throws IOException {
|
||||
return append(info, tableName, edits, clusterIds, now, htd, false, isInMemstore, sequenceId,
|
||||
nonceGroup, nonce);
|
||||
boolean inMemstore, long nonceGroup, long nonce) throws IOException {
|
||||
HLogKey logKey =
|
||||
new HLogKey(info.getEncodedNameAsBytes(), tableName, now, clusterIds, nonceGroup, nonce);
|
||||
return append(htd, info, logKey, edits, sequenceId, false, inMemstore);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long appendNoSync(final HTableDescriptor htd, final HRegionInfo info, final HLogKey key,
|
||||
final WALEdit edits, final AtomicLong sequenceId, final boolean inMemstore)
|
||||
throws IOException {
|
||||
return append(htd, info, key, edits, sequenceId, false, inMemstore);
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and
|
||||
* log-sequence-id.
|
||||
*
|
||||
* Later, if we sort by these keys, we obtain all the relevant edits for a given key-range of the
|
||||
* HRegion (TODO). Any edits that do not have a matching COMPLETE_CACHEFLUSH message can be
|
||||
* discarded.
|
||||
*
|
||||
* <p>Logs cannot be restarted once closed, or once the HLog process dies. Each time the HLog
|
||||
* starts, it must create a new log. This means that other systems should process the log
|
||||
* appropriately upon each startup (and prior to initializing HLog).
|
||||
*
|
||||
* Synchronized prevents appends during the completion of a cache flush or for the duration of a
|
||||
* log roll.
|
||||
*
|
||||
* @param info
|
||||
* @param tableName
|
||||
* @param key
|
||||
* @param edits
|
||||
* @param clusterIds that have consumed the change (for replication)
|
||||
* @param now
|
||||
* @param htd
|
||||
* @param doSync shall we sync after we call the append?
|
||||
* @param htd This comes in here just so it is available on a pre append for replications. Get
|
||||
* rid of it. It is kinda crazy this comes in here when we have tablename and regioninfo.
|
||||
* Replication gets its scope from the HTD.
|
||||
* @param hri region info
|
||||
* @param sync shall we sync after we call the append?
|
||||
* @param inMemstore
|
||||
* @param sequenceId of the region.
|
||||
* @param nonceGroup
|
||||
* @param nonce
|
||||
* @param sequenceId The region sequence id reference.
|
||||
* @return txid of this transaction or if nothing to do, the last txid
|
||||
* @throws IOException
|
||||
*/
|
||||
private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
|
||||
final long now, HTableDescriptor htd, boolean doSync, boolean inMemstore,
|
||||
AtomicLong sequenceId, long nonceGroup, long nonce)
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH_EXCEPTION",
|
||||
justification="Will never be null")
|
||||
private long append(HTableDescriptor htd, final HRegionInfo hri, final HLogKey key,
|
||||
WALEdit edits, AtomicLong sequenceId, boolean sync, boolean inMemstore)
|
||||
throws IOException {
|
||||
if (!this.enabled || edits.isEmpty()) return this.highestUnsyncedSequence;
|
||||
if (!this.enabled) return this.highestUnsyncedSequence;
|
||||
if (this.closed) throw new IOException("Cannot append; log is closed");
|
||||
// Make a trace scope for the append. It is closed on other side of the ring buffer by the
|
||||
// single consuming thread. Don't have to worry about it.
|
||||
TraceScope scope = Trace.startSpan("FSHLog.append");
|
||||
// Make a key but do not set the WALEdit by region sequence id now -- set it to -1 for now --
|
||||
// and then later just before we write it out to the DFS stream, then set the sequence id;
|
||||
// late-binding.
|
||||
HLogKey logKey =
|
||||
makeKey(info.getEncodedNameAsBytes(), tableName, -1, now, clusterIds, nonceGroup, nonce);
|
||||
// This is crazy how much it takes to make an edit. Do we need all this stuff!!!!???? We need
|
||||
// all the stuff to make a key and then below to append the edit, we need to carry htd, info,
|
||||
// all this to make a key and then below to append the edit, we need to carry htd, info,
|
||||
// etc. all over the ring buffer.
|
||||
FSWALEntry entry = null;
|
||||
long sequence = this.disruptor.getRingBuffer().next();
|
||||
try {
|
||||
RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);
|
||||
FSWALEntry entry =
|
||||
new FSWALEntry(sequence, logKey, edits, sequenceId, inMemstore, htd, info);
|
||||
// Construction of FSWALEntry sets a latch. The latch is thrown just after we stamp the
|
||||
// edit with its edit/sequence id. The below entry.getRegionSequenceId will wait on the
|
||||
// latch to be thrown. TODO: reuse FSWALEntry as we do SyncFuture rather create per append.
|
||||
entry = new FSWALEntry(sequence, key, edits, sequenceId, inMemstore, htd, hri);
|
||||
truck.loadPayload(entry, scope.detach());
|
||||
} finally {
|
||||
this.disruptor.getRingBuffer().publish(sequence);
|
||||
// Now wait until the region edit/sequence id is available. The 'entry' has an internal
|
||||
// latch that is thrown when the region edit/sequence id is set. Calling
|
||||
// entry.getRegionSequenceId will cause us block until the latch is thrown. The return is
|
||||
// the region edit/sequence id, not the ring buffer txid.
|
||||
try {
|
||||
entry.getRegionSequenceId();
|
||||
} catch (InterruptedException e) {
|
||||
throw convertInterruptedExceptionToIOException(e);
|
||||
}
|
||||
}
|
||||
// doSync is set in tests. Usually we arrive in here via appendNoSync w/ the sync called after
|
||||
// all edits on a handler have been added.
|
||||
//
|
||||
// When we sync, we will sync to the current point, the txid of the last edit added.
|
||||
// Since we are single writer, the next txid should be the just next one in sequence;
|
||||
// do not explicitly specify it. Sequence id/txid is an implementation internal detail.
|
||||
if (doSync) sync();
|
||||
if (sync) sync(sequence);
|
||||
return sequence;
|
||||
}
|
||||
|
||||
|
@ -1436,15 +1396,20 @@ class FSHLog implements HLog, Syncable {
|
|||
syncFuture.get();
|
||||
return syncFuture.getSpan();
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
IOException ioe = new InterruptedIOException();
|
||||
ioe.initCause(ie);
|
||||
throw ioe;
|
||||
LOG.warn("Interrupted", ie);
|
||||
throw convertInterruptedExceptionToIOException(ie);
|
||||
} catch (ExecutionException e) {
|
||||
throw ensureIOException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
IOException ioe = new InterruptedIOException();
|
||||
ioe.initCause(ie);
|
||||
return ioe;
|
||||
}
|
||||
|
||||
private SyncFuture getSyncFuture(final long sequence, Span span) {
|
||||
SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread());
|
||||
if (syncFuture == null) {
|
||||
|
@ -1879,7 +1844,7 @@ class FSHLog implements HLog, Syncable {
|
|||
attainSafePoint(sequence);
|
||||
this.syncFuturesCount = 0;
|
||||
} catch (Throwable t) {
|
||||
LOG.error("UNEXPECTED!!!", t);
|
||||
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1918,18 +1883,17 @@ class FSHLog implements HLog, Syncable {
|
|||
* @throws Exception
|
||||
*/
|
||||
void append(final FSWALEntry entry) throws Exception {
|
||||
// TODO: WORK ON MAKING THIS APPEND FASTER. OING WAY TOO MUCH WORK WITH CPs, PBing, etc.
|
||||
// TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc.
|
||||
atHeadOfRingBufferEventHandlerAppend();
|
||||
|
||||
long start = EnvironmentEdgeManager.currentTimeMillis();
|
||||
byte [] encodedRegionName = entry.getKey().getEncodedRegionName();
|
||||
long regionSequenceId = HLog.NO_SEQUENCE_ID;
|
||||
try {
|
||||
// We are about to append this edit; update the region-scoped sequence number. Do it
|
||||
// here inside this single appending/writing thread. Events are ordered on the ringbuffer
|
||||
// so region sequenceids will also be in order.
|
||||
long regionSequenceId = entry.getRegionSequenceIdReference().incrementAndGet();
|
||||
// Set the region-scoped sequence number back up into the key ("late-binding" --
|
||||
// setting before append).
|
||||
entry.getKey().setLogSeqNum(regionSequenceId);
|
||||
regionSequenceId = entry.stampRegionSequenceId();
|
||||
// Coprocessor hook.
|
||||
if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(),
|
||||
entry.getEdit())) {
|
||||
|
@ -1945,13 +1909,18 @@ class FSHLog implements HLog, Syncable {
|
|||
entry.getEdit());
|
||||
}
|
||||
}
|
||||
writer.append(entry);
|
||||
assert highestUnsyncedSequence < entry.getSequence();
|
||||
highestUnsyncedSequence = entry.getSequence();
|
||||
Long lRegionSequenceId = Long.valueOf(regionSequenceId);
|
||||
highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
|
||||
if (entry.isInMemstore()) {
|
||||
oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
|
||||
// If empty, there is nothing to append. Maybe empty when we are looking for a region
|
||||
// sequence id only, a region edit/sequence id that is not associated with an actual edit.
|
||||
// It has to go through all the rigmarole to be sure we have the right ordering.
|
||||
if (!entry.getEdit().isEmpty()) {
|
||||
writer.append(entry);
|
||||
assert highestUnsyncedSequence < entry.getSequence();
|
||||
highestUnsyncedSequence = entry.getSequence();
|
||||
Long lRegionSequenceId = Long.valueOf(regionSequenceId);
|
||||
highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId);
|
||||
if (entry.isInMemstore()) {
|
||||
oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId);
|
||||
}
|
||||
}
|
||||
coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit());
|
||||
// Update metrics.
|
||||
|
@ -1975,6 +1944,14 @@ class FSHLog implements HLog, Syncable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Exposed for testing only. Use to tricks like halt the ring buffer appending.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void atHeadOfRingBufferEventHandlerAppend() {
|
||||
// Noop
|
||||
}
|
||||
|
||||
private static IOException ensureIOException(final Throwable t) {
|
||||
return (t instanceof IOException)? (IOException)t: new IOException(t);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -25,10 +26,12 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
|
||||
/**
|
||||
* A WAL Entry for {@link FSHLog} implementation. Immutable.
|
||||
* It is a subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
|
||||
* A subclass of {@link HLog.Entry} that carries extra info across the ring buffer such as
|
||||
* region sequence id (we want to use this later, just before we write the WAL to ensure region
|
||||
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
|
||||
* hence marked 'transient' to underline this fact.
|
||||
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
|
||||
* the assign of the region sequence id. See {@link #setRegionSequenceId(long)} and
|
||||
* {@link #getRegionSequenceId()}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class FSWALEntry extends HLog.Entry {
|
||||
|
@ -39,6 +42,9 @@ class FSWALEntry extends HLog.Entry {
|
|||
private final transient boolean inMemstore;
|
||||
private final transient HTableDescriptor htd;
|
||||
private final transient HRegionInfo hri;
|
||||
// Latch that is set on creation and then is undone on the other side of the ring buffer by the
|
||||
// consumer thread just after it sets the region edit/sequence id in here.
|
||||
private final transient CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
FSWALEntry(final long sequence, final HLogKey key, final WALEdit edit,
|
||||
final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
|
||||
|
@ -55,10 +61,6 @@ class FSWALEntry extends HLog.Entry {
|
|||
return "sequence=" + this.sequence + ", " + super.toString();
|
||||
};
|
||||
|
||||
AtomicLong getRegionSequenceIdReference() {
|
||||
return this.regionSequenceIdReference;
|
||||
}
|
||||
|
||||
boolean isInMemstore() {
|
||||
return this.inMemstore;
|
||||
}
|
||||
|
@ -77,4 +79,27 @@ class FSWALEntry extends HLog.Entry {
|
|||
long getSequence() {
|
||||
return this.sequence;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stamp this edit with a region edit/sequence id.
|
||||
* Call when safe to do so: i.e. the context is such that the increment on the passed in
|
||||
* {@link #regionSequenceIdReference} is guaranteed aligned w/ how appends are going into the
|
||||
* WAL. This method works with {@link #getRegionSequenceId()}. It will block waiting on this
|
||||
* method if on initialization our edit/sequence id is {@link HLogKey#NO_SEQ_NO}.
|
||||
* @return The region edit/sequence id we set for this edit.
|
||||
* @see #getRegionSequenceId()
|
||||
*/
|
||||
long stampRegionSequenceId() {
|
||||
long regionSequenceId = this.regionSequenceIdReference.incrementAndGet();
|
||||
getKey().setLogSeqNum(regionSequenceId);
|
||||
// On creation, a latch was set. Count it down when sequence id is set. This will free
|
||||
// up anyone blocked on {@link #getRegionSequenceId()}
|
||||
this.latch.countDown();
|
||||
return regionSequenceId;
|
||||
}
|
||||
|
||||
long getRegionSequenceId() throws InterruptedException {
|
||||
this.latch.await();
|
||||
return getKey().getLogSeqNum();
|
||||
}
|
||||
}
|
|
@ -49,6 +49,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
// TODO: Rename interface to WAL
|
||||
public interface HLog {
|
||||
Log LOG = LogFactory.getLog(HLog.class);
|
||||
public static final long NO_SEQUENCE_ID = -1;
|
||||
|
||||
/** File Extension used while splitting an HLog into regions (HBASE-2312) */
|
||||
// TODO: this seems like an implementation detail that does not belong here.
|
||||
|
@ -288,6 +289,9 @@ public interface HLog {
|
|||
* @param htd
|
||||
* @param sequenceId
|
||||
* @throws IOException
|
||||
* @deprecated For tests only and even then, should use
|
||||
* {@link #appendNoSync(HTableDescriptor, HRegionInfo, HLogKey, WALEdit, AtomicLong, boolean)}
|
||||
* and {@link #sync()} instead.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void append(HRegionInfo info, TableName tableName, WALEdit edits,
|
||||
|
@ -311,8 +315,9 @@ public interface HLog {
|
|||
|
||||
/**
|
||||
* Append a set of edits to the WAL. WAL edits are keyed by (encoded) regionName, rowname, and
|
||||
* log-sequence-id. The WAL is not flushed/sync'd after this transaction completes.
|
||||
* Call {@link #sync()} to flush/sync all outstanding edits/appends.
|
||||
* log-sequence-id. The WAL is not flushed/sync'd after this transaction completes BUT on return
|
||||
* this edit must have its region edit/sequence id assigned else it messes up our unification
|
||||
* of mvcc and sequenceid.
|
||||
* @param info
|
||||
* @param tableName
|
||||
* @param edits
|
||||
|
@ -332,11 +337,39 @@ public interface HLog {
|
|||
* able to sync an explicit edit only (the current default implementation syncs up to the time
|
||||
* of the sync call syncing whatever is behind the sync).
|
||||
* @throws IOException
|
||||
* @deprecated Use
|
||||
* {@link #appendNoSync(HRegionInfo, HLogKey, WALEdit, HTableDescriptor, AtomicLong, boolean)}
|
||||
* instead because you can get back the region edit/sequenceid; it is set into the passed in
|
||||
* <code>key</code>.
|
||||
*/
|
||||
long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits,
|
||||
List<UUID> clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId,
|
||||
boolean isInMemstore, long nonceGroup, long nonce) throws IOException;
|
||||
|
||||
/**
|
||||
* Append a set of edits to the WAL. The WAL is not flushed/sync'd after this transaction
|
||||
* completes BUT on return this edit must have its region edit/sequence id assigned
|
||||
* else it messes up our unification of mvcc and sequenceid. On return <code>key</code> will
|
||||
* have the region edit/sequence id filled in.
|
||||
* @param info
|
||||
* @param key Modified by this call; we add to it this edits region edit/sequence id.
|
||||
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
|
||||
* sequence id that is after all currently appended edits.
|
||||
* @param htd
|
||||
* @param sequenceId A reference to the atomic long the <code>info</code> region is using as
|
||||
* source of its incrementing edits sequence id. Inside in this call we will increment it and
|
||||
* attach the sequence to the edit we apply the WAL.
|
||||
* @param inMemstore Always true except for case where we are writing a compaction completion
|
||||
* record into the WAL; in this case the entry is just so we can finish an unfinished compaction
|
||||
* -- it is not an edit for memstore.
|
||||
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
|
||||
* in it.
|
||||
* @throws IOException
|
||||
*/
|
||||
long appendNoSync(HTableDescriptor htd, HRegionInfo info, HLogKey key, WALEdit edits,
|
||||
AtomicLong sequenceId, boolean inMemstore)
|
||||
throws IOException;
|
||||
|
||||
// TODO: Do we need all these versions of sync?
|
||||
void hsync() throws IOException;
|
||||
|
||||
|
|
|
@ -47,13 +47,14 @@ public class HLogFactory {
|
|||
|
||||
public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
|
||||
final String oldLogName, final Configuration conf) throws IOException {
|
||||
return new FSHLog(fs, root, logName, oldLogName, conf);
|
||||
return new FSHLog(fs, root, logName, oldLogName, conf, null, true, null, false);
|
||||
}
|
||||
|
||||
public static HLog createHLog(final FileSystem fs, final Path root, final String logName,
|
||||
final Configuration conf, final List<WALActionsListener> listeners,
|
||||
final String prefix) throws IOException {
|
||||
return new FSHLog(fs, root, logName, conf, listeners, prefix);
|
||||
return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners,
|
||||
true, prefix, false);
|
||||
}
|
||||
|
||||
public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName,
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.DataOutput;
|
|||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -31,7 +32,6 @@ import java.util.NavigableMap;
|
|||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
||||
import com.google.protobuf.HBaseZeroCopyByteString;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -43,11 +43,13 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.HBaseZeroCopyByteString;
|
||||
|
||||
/**
|
||||
* A Key for an entry in the change log.
|
||||
|
@ -122,6 +124,7 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
|
||||
private long nonceGroup = HConstants.NO_NONCE;
|
||||
private long nonce = HConstants.NO_NONCE;
|
||||
static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
|
||||
|
||||
private CompressionContext compressionContext;
|
||||
|
||||
|
@ -139,10 +142,20 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
|
||||
this(encodedRegionName, tablename, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
|
||||
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now,
|
||||
EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the log key for writing to somewhere.
|
||||
* We maintain the tablename mainly for debugging purposes.
|
||||
* A regionName is always a sub-table object.
|
||||
* <p>Used by log splitting and snapshots.
|
||||
*
|
||||
* @param encodedRegionName Encoded name of the region as returned by
|
||||
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
|
||||
|
@ -156,6 +169,41 @@ public class HLogKey implements WritableComparable<HLogKey> {
|
|||
init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the log key for writing to somewhere.
|
||||
* We maintain the tablename mainly for debugging purposes.
|
||||
* A regionName is always a sub-table object.
|
||||
*
|
||||
* @param encodedRegionName Encoded name of the region as returned by
|
||||
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
|
||||
* @param tablename
|
||||
* @param now Time at which this edit was written.
|
||||
* @param clusterIds the clusters that have consumed the change(used in Replication)
|
||||
* @param nonceGroup
|
||||
* @param nonce
|
||||
*/
|
||||
public HLogKey(final byte [] encodedRegionName, final TableName tablename,
|
||||
final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
|
||||
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the log key for writing to somewhere.
|
||||
* We maintain the tablename mainly for debugging purposes.
|
||||
* A regionName is always a sub-table object.
|
||||
*
|
||||
* @param encodedRegionName Encoded name of the region as returned by
|
||||
* <code>HRegionInfo#getEncodedNameAsBytes()</code>.
|
||||
* @param tablename
|
||||
* @param nonceGroup
|
||||
* @param nonce
|
||||
*/
|
||||
public HLogKey(final byte [] encodedRegionName, final TableName tablename, long nonceGroup,
|
||||
long nonce) {
|
||||
init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID,
|
||||
EnvironmentEdgeManager.currentTimeMillis(), EMPTY_UUIDS, nonceGroup, nonce);
|
||||
}
|
||||
|
||||
protected void init(final byte [] encodedRegionName, final TableName tablename,
|
||||
long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
|
||||
this.logSeqNum = logSeqNum;
|
||||
|
|
|
@ -20,10 +20,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -35,13 +33,12 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
|
||||
import com.google.protobuf.TextFormat;
|
||||
|
@ -263,13 +260,10 @@ public class HLogUtil {
|
|||
*/
|
||||
public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info,
|
||||
final CompactionDescriptor c, AtomicLong sequenceId) throws IOException {
|
||||
WALEdit e = WALEdit.createCompaction(c);
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
TableName tn = TableName.valueOf(c.getTableName().toByteArray());
|
||||
log.appendNoSync(info, tn, e, new ArrayList<UUID>(), now, htd, sequenceId, false,
|
||||
HConstants.NO_NONCE, HConstants.NO_NONCE);
|
||||
HLogKey key = new HLogKey(info.getEncodedNameAsBytes(), tn);
|
||||
log.appendNoSync(htd, info, key, WALEdit.createCompaction(c), sequenceId, false);
|
||||
log.sync();
|
||||
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
|
||||
}
|
||||
|
|
|
@ -78,14 +78,6 @@ class RingBufferTruck {
|
|||
return this.syncFuture != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* return {@code true} when this truck is carrying a {@link Span},
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
boolean hasSpanPayload() {
|
||||
return this.span != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unload the truck of its {@link FSWALEntry} payload. The internal refernce is released.
|
||||
*/
|
||||
|
@ -105,7 +97,7 @@ class RingBufferTruck {
|
|||
}
|
||||
|
||||
/**
|
||||
* Unload the truck of its {@link Span} payload. The internal refernce is released.
|
||||
* Unload the truck of its {@link Span} payload. The internal reference is released.
|
||||
*/
|
||||
Span unloadSpanPayload() {
|
||||
Span ret = this.span;
|
||||
|
|
|
@ -48,6 +48,9 @@ import org.htrace.Span;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
class SyncFuture {
|
||||
// Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
|
||||
// to coordinate on but it did not give any obvious advantage and some issues with order in which
|
||||
// events happen.
|
||||
private static final long NOT_DONE = 0;
|
||||
|
||||
/**
|
||||
|
@ -187,4 +190,4 @@ class SyncFuture {
|
|||
synchronized Throwable getThrowable() {
|
||||
return this.throwable;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -87,9 +87,11 @@ public interface WALActionsListener {
|
|||
* @param htd
|
||||
* @param logKey
|
||||
* @param logEdit
|
||||
* TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, HLogKey, WALEdit)}
|
||||
* It only exists to get scope when replicating. Scope should be in the HLogKey and not need
|
||||
* us passing in a <code>htd</code>.
|
||||
*/
|
||||
void visitLogEntryBeforeWrite(
|
||||
HTableDescriptor htd, HLogKey logKey, WALEdit logEdit
|
||||
);
|
||||
|
||||
}
|
||||
}
|
|
@ -80,13 +80,14 @@ public class WALEdit implements Writable, HeapSize {
|
|||
// TODO: Get rid of this; see HBASE-8457
|
||||
public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
|
||||
static final byte [] METAROW = Bytes.toBytes("METAROW");
|
||||
static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
|
||||
static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
|
||||
private final int VERSION_2 = -1;
|
||||
private final boolean isReplay;
|
||||
|
||||
private final ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(1);
|
||||
|
||||
public static final WALEdit EMPTY_WALEDIT = new WALEdit();
|
||||
|
||||
// Only here for legacy writable deserialization
|
||||
@Deprecated
|
||||
private NavigableMap<byte[], Integer> scopes;
|
||||
|
|
|
@ -34,9 +34,7 @@ import static org.junit.Assert.assertNull;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
@ -4184,8 +4182,8 @@ public class TestHRegion {
|
|||
|
||||
//verify append called or not
|
||||
verify(log, expectAppend ? times(1) : never())
|
||||
.appendNoSync((HRegionInfo)any(), eq(tableName), (WALEdit)any(), (List<UUID>)any(),
|
||||
anyLong(), (HTableDescriptor)any(), (AtomicLong)any(), anyBoolean(), anyLong(), anyLong());
|
||||
.appendNoSync((HTableDescriptor)any(), (HRegionInfo)any(), (HLogKey)any(),
|
||||
(WALEdit)any(), (AtomicLong)any(), Mockito.anyBoolean());
|
||||
|
||||
// verify sync called or not
|
||||
if (expectSync || expectSyncFromLogSyncer) {
|
||||
|
|
|
@ -18,17 +18,26 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.BindException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
|
@ -38,14 +47,31 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.Coprocessor;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -137,6 +163,86 @@ public class TestHLog {
|
|||
return "TestHLog";
|
||||
}
|
||||
|
||||
/**
|
||||
* Test flush for sure has a sequence id that is beyond the last edit appended. We do this
|
||||
* by slowing appends in the background ring buffer thread while in foreground we call
|
||||
* flush. The addition of the sync over HRegion in flush should fix an issue where flush was
|
||||
* returning before all of its appends had made it out to the WAL (HBASE-11109).
|
||||
* @throws IOException
|
||||
* @see HBASE-11109
|
||||
*/
|
||||
@Test
|
||||
public void testFlushSequenceIdIsGreaterThanAllEditsInHFile() throws IOException {
|
||||
String testName = "testFlushSequenceIdIsGreaterThanAllEditsInHFile";
|
||||
final TableName tableName = TableName.valueOf(testName);
|
||||
final HRegionInfo hri = new HRegionInfo(tableName);
|
||||
final byte[] rowName = tableName.getName();
|
||||
final HTableDescriptor htd = new HTableDescriptor(tableName);
|
||||
htd.addFamily(new HColumnDescriptor("f"));
|
||||
HRegion r = HRegion.createHRegion(hri, TEST_UTIL.getDefaultRootDirPath(),
|
||||
TEST_UTIL.getConfiguration(), htd);
|
||||
HRegion.closeHRegion(r);
|
||||
final int countPerFamily = 10;
|
||||
final MutableBoolean goslow = new MutableBoolean(false);
|
||||
// Bypass factory so I can subclass and doctor a method.
|
||||
FSHLog wal = new FSHLog(FileSystem.get(conf), TEST_UTIL.getDefaultRootDirPath(),
|
||||
testName, conf) {
|
||||
@Override
|
||||
void atHeadOfRingBufferEventHandlerAppend() {
|
||||
if (goslow.isTrue()) {
|
||||
Threads.sleep(100);
|
||||
LOG.debug("Sleeping before appending 100ms");
|
||||
}
|
||||
super.atHeadOfRingBufferEventHandlerAppend();
|
||||
}
|
||||
};
|
||||
HRegion region = HRegion.openHRegion(TEST_UTIL.getConfiguration(),
|
||||
TEST_UTIL.getTestFileSystem(), TEST_UTIL.getDefaultRootDirPath(), hri, htd, wal);
|
||||
EnvironmentEdge ee = EnvironmentEdgeManager.getDelegate();
|
||||
try {
|
||||
List<Put> puts = null;
|
||||
for (HColumnDescriptor hcd: htd.getFamilies()) {
|
||||
puts =
|
||||
TestWALReplay.addRegionEdits(rowName, hcd.getName(), countPerFamily, ee, region, "x");
|
||||
}
|
||||
|
||||
// Now assert edits made it in.
|
||||
final Get g = new Get(rowName);
|
||||
Result result = region.get(g);
|
||||
assertEquals(countPerFamily * htd.getFamilies().size(), result.size());
|
||||
|
||||
// Construct a WALEdit and add it a few times to the WAL.
|
||||
WALEdit edits = new WALEdit();
|
||||
for (Put p: puts) {
|
||||
CellScanner cs = p.cellScanner();
|
||||
while (cs.advance()) {
|
||||
edits.add(KeyValueUtil.ensureKeyValue(cs.current()));
|
||||
}
|
||||
}
|
||||
// Add any old cluster id.
|
||||
List<UUID> clusterIds = new ArrayList<UUID>();
|
||||
clusterIds.add(UUID.randomUUID());
|
||||
// Now make appends run slow.
|
||||
goslow.setValue(true);
|
||||
for (int i = 0; i < countPerFamily; i++) {
|
||||
wal.appendNoSync(region.getRegionInfo(), tableName, edits,
|
||||
clusterIds, System.currentTimeMillis(), htd, region.getSequenceId(), true, -1, -1);
|
||||
}
|
||||
region.flushcache();
|
||||
// FlushResult.flushSequenceId is not visible here so go get the current sequence id.
|
||||
long currentSequenceId = region.getSequenceId().get();
|
||||
// Now release the appends
|
||||
goslow.setValue(false);
|
||||
synchronized (goslow) {
|
||||
goslow.notifyAll();
|
||||
}
|
||||
assertTrue(currentSequenceId >= region.getSequenceId().get());
|
||||
} finally {
|
||||
region.close(true);
|
||||
wal.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write to a log file with three concurrent threads and verifying all data is written.
|
||||
* @throws Exception
|
||||
|
|
|
@ -433,153 +433,156 @@ public class TestLogRolling {
|
|||
LOG.info("Replication=" +
|
||||
fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()));
|
||||
// When the hbase:meta table can be opened, the region servers are running
|
||||
new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
|
||||
|
||||
this.server = cluster.getRegionServer(0);
|
||||
this.log = server.getWAL();
|
||||
|
||||
// Create the test table and open it
|
||||
String tableName = getName();
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
|
||||
admin.createTable(desc);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
|
||||
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||
this.log = server.getWAL();
|
||||
final List<Path> paths = new ArrayList<Path>();
|
||||
final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
|
||||
paths.add(((FSHLog) log).computeFilename());
|
||||
log.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void preLogRoll(Path oldFile, Path newFile) {
|
||||
LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
|
||||
preLogRolledCalled.add(new Integer(1));
|
||||
}
|
||||
@Override
|
||||
public void postLogRoll(Path oldFile, Path newFile) {
|
||||
paths.add(newFile);
|
||||
}
|
||||
@Override
|
||||
public void preLogArchive(Path oldFile, Path newFile) {}
|
||||
@Override
|
||||
public void postLogArchive(Path oldFile, Path newFile) {}
|
||||
@Override
|
||||
public void logRollRequested() {}
|
||||
@Override
|
||||
public void logCloseRequested() {}
|
||||
@Override
|
||||
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
|
||||
WALEdit logEdit) {}
|
||||
@Override
|
||||
public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
|
||||
WALEdit logEdit) {}
|
||||
});
|
||||
|
||||
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
|
||||
// don't run this test without append support (HDFS-200 & HDFS-142)
|
||||
assertTrue("Need append support for this test", FSUtils
|
||||
.isAppendSupported(TEST_UTIL.getConfiguration()));
|
||||
|
||||
writeData(table, 1002);
|
||||
|
||||
table.setAutoFlush(true, true);
|
||||
|
||||
long curTime = System.currentTimeMillis();
|
||||
long oldFilenum = log.getFilenum();
|
||||
assertTrue("Log should have a timestamp older than now",
|
||||
curTime > oldFilenum && oldFilenum != -1);
|
||||
|
||||
assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
|
||||
|
||||
// roll all datanodes in the pipeline
|
||||
dfsCluster.restartDataNodes();
|
||||
Thread.sleep(1000);
|
||||
dfsCluster.waitActive();
|
||||
LOG.info("Data Nodes restarted");
|
||||
validateData(table, 1002);
|
||||
|
||||
// this write should succeed, but trigger a log roll
|
||||
writeData(table, 1003);
|
||||
long newFilenum = log.getFilenum();
|
||||
|
||||
assertTrue("Missing datanode should've triggered a log roll",
|
||||
newFilenum > oldFilenum && newFilenum > curTime);
|
||||
validateData(table, 1003);
|
||||
|
||||
writeData(table, 1004);
|
||||
|
||||
// roll all datanode again
|
||||
dfsCluster.restartDataNodes();
|
||||
Thread.sleep(1000);
|
||||
dfsCluster.waitActive();
|
||||
LOG.info("Data Nodes restarted");
|
||||
validateData(table, 1004);
|
||||
|
||||
// this write should succeed, but trigger a log roll
|
||||
writeData(table, 1005);
|
||||
|
||||
// force a log roll to read back and verify previously written logs
|
||||
log.rollWriter(true);
|
||||
assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
|
||||
preLogRolledCalled.size() >= 1);
|
||||
|
||||
// read back the data written
|
||||
Set<String> loggedRows = new HashSet<String>();
|
||||
FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
|
||||
for (Path p : paths) {
|
||||
LOG.debug("recovering lease for " + p);
|
||||
fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null);
|
||||
|
||||
LOG.debug("Reading HLog "+FSUtils.getPath(p));
|
||||
HLog.Reader reader = null;
|
||||
try {
|
||||
reader = HLogFactory.createReader(fs, p,
|
||||
TEST_UTIL.getConfiguration());
|
||||
HLog.Entry entry;
|
||||
while ((entry = reader.next()) != null) {
|
||||
LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
|
||||
for (KeyValue kv : entry.getEdit().getKeyValues()) {
|
||||
loggedRows.add(Bytes.toStringBinary(kv.getRow()));
|
||||
}
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
LOG.debug("EOF reading file "+FSUtils.getPath(p));
|
||||
} finally {
|
||||
if (reader != null) reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
// verify the written rows are there
|
||||
assertTrue(loggedRows.contains("row1002"));
|
||||
assertTrue(loggedRows.contains("row1003"));
|
||||
assertTrue(loggedRows.contains("row1004"));
|
||||
assertTrue(loggedRows.contains("row1005"));
|
||||
|
||||
// flush all regions
|
||||
List<HRegion> regions =
|
||||
new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
|
||||
for (HRegion r: regions) {
|
||||
r.flushcache();
|
||||
}
|
||||
|
||||
ResultScanner scanner = table.getScanner(new Scan());
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
|
||||
try {
|
||||
for (int i=2; i<=5; i++) {
|
||||
Result r = scanner.next();
|
||||
assertNotNull(r);
|
||||
assertFalse(r.isEmpty());
|
||||
assertEquals("row100"+i, Bytes.toString(r.getRow()));
|
||||
this.server = cluster.getRegionServer(0);
|
||||
this.log = server.getWAL();
|
||||
|
||||
// Create the test table and open it
|
||||
String tableName = getName();
|
||||
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
|
||||
desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY));
|
||||
|
||||
admin.createTable(desc);
|
||||
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
|
||||
|
||||
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||
this.log = server.getWAL();
|
||||
final List<Path> paths = new ArrayList<Path>();
|
||||
final List<Integer> preLogRolledCalled = new ArrayList<Integer>();
|
||||
paths.add(((FSHLog) log).computeFilename());
|
||||
log.registerWALActionsListener(new WALActionsListener() {
|
||||
@Override
|
||||
public void preLogRoll(Path oldFile, Path newFile) {
|
||||
LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile);
|
||||
preLogRolledCalled.add(new Integer(1));
|
||||
}
|
||||
@Override
|
||||
public void postLogRoll(Path oldFile, Path newFile) {
|
||||
paths.add(newFile);
|
||||
}
|
||||
@Override
|
||||
public void preLogArchive(Path oldFile, Path newFile) {}
|
||||
@Override
|
||||
public void postLogArchive(Path oldFile, Path newFile) {}
|
||||
@Override
|
||||
public void logRollRequested() {}
|
||||
@Override
|
||||
public void logCloseRequested() {}
|
||||
@Override
|
||||
public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
|
||||
WALEdit logEdit) {}
|
||||
@Override
|
||||
public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
|
||||
WALEdit logEdit) {}
|
||||
});
|
||||
|
||||
assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas());
|
||||
// don't run this test without append support (HDFS-200 & HDFS-142)
|
||||
assertTrue("Need append support for this test", FSUtils
|
||||
.isAppendSupported(TEST_UTIL.getConfiguration()));
|
||||
|
||||
writeData(table, 1002);
|
||||
|
||||
table.setAutoFlush(true, true);
|
||||
|
||||
long curTime = System.currentTimeMillis();
|
||||
long oldFilenum = log.getFilenum();
|
||||
assertTrue("Log should have a timestamp older than now",
|
||||
curTime > oldFilenum && oldFilenum != -1);
|
||||
|
||||
assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum());
|
||||
|
||||
// roll all datanodes in the pipeline
|
||||
dfsCluster.restartDataNodes();
|
||||
Thread.sleep(1000);
|
||||
dfsCluster.waitActive();
|
||||
LOG.info("Data Nodes restarted");
|
||||
validateData(table, 1002);
|
||||
|
||||
// this write should succeed, but trigger a log roll
|
||||
writeData(table, 1003);
|
||||
long newFilenum = log.getFilenum();
|
||||
|
||||
assertTrue("Missing datanode should've triggered a log roll",
|
||||
newFilenum > oldFilenum && newFilenum > curTime);
|
||||
validateData(table, 1003);
|
||||
|
||||
writeData(table, 1004);
|
||||
|
||||
// roll all datanode again
|
||||
dfsCluster.restartDataNodes();
|
||||
Thread.sleep(1000);
|
||||
dfsCluster.waitActive();
|
||||
LOG.info("Data Nodes restarted");
|
||||
validateData(table, 1004);
|
||||
|
||||
// this write should succeed, but trigger a log roll
|
||||
writeData(table, 1005);
|
||||
|
||||
// force a log roll to read back and verify previously written logs
|
||||
log.rollWriter(true);
|
||||
assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(),
|
||||
preLogRolledCalled.size() >= 1);
|
||||
|
||||
// read back the data written
|
||||
Set<String> loggedRows = new HashSet<String>();
|
||||
FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration());
|
||||
for (Path p : paths) {
|
||||
LOG.debug("recovering lease for " + p);
|
||||
fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p,
|
||||
TEST_UTIL.getConfiguration(), null);
|
||||
|
||||
LOG.debug("Reading HLog "+FSUtils.getPath(p));
|
||||
HLog.Reader reader = null;
|
||||
try {
|
||||
reader = HLogFactory.createReader(fs, p,
|
||||
TEST_UTIL.getConfiguration());
|
||||
HLog.Entry entry;
|
||||
while ((entry = reader.next()) != null) {
|
||||
LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues());
|
||||
for (KeyValue kv : entry.getEdit().getKeyValues()) {
|
||||
loggedRows.add(Bytes.toStringBinary(kv.getRow()));
|
||||
}
|
||||
}
|
||||
} catch (EOFException e) {
|
||||
LOG.debug("EOF reading file "+FSUtils.getPath(p));
|
||||
} finally {
|
||||
if (reader != null) reader.close();
|
||||
}
|
||||
}
|
||||
|
||||
// verify the written rows are there
|
||||
assertTrue(loggedRows.contains("row1002"));
|
||||
assertTrue(loggedRows.contains("row1003"));
|
||||
assertTrue(loggedRows.contains("row1004"));
|
||||
assertTrue(loggedRows.contains("row1005"));
|
||||
|
||||
// flush all regions
|
||||
List<HRegion> regions = new ArrayList<HRegion>(server.getOnlineRegionsLocalContext());
|
||||
for (HRegion r: regions) {
|
||||
r.flushcache();
|
||||
}
|
||||
|
||||
ResultScanner scanner = table.getScanner(new Scan());
|
||||
try {
|
||||
for (int i=2; i<=5; i++) {
|
||||
Result r = scanner.next();
|
||||
assertNotNull(r);
|
||||
assertFalse(r.isEmpty());
|
||||
assertEquals("row100"+i, Bytes.toString(r.getRow()));
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
// verify that no region servers aborted
|
||||
for (JVMClusterUtil.RegionServerThread rsThread:
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
|
||||
assertFalse(rsThread.getRegionServer().isAborted());
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
|
||||
// verify that no region servers aborted
|
||||
for (JVMClusterUtil.RegionServerThread rsThread:
|
||||
TEST_UTIL.getHBaseCluster().getRegionServerThreads()) {
|
||||
assertFalse(rsThread.getRegionServer().isAborted());
|
||||
if (t != null) t.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -589,57 +592,62 @@ public class TestLogRolling {
|
|||
*/
|
||||
@Test
|
||||
public void testCompactionRecordDoesntBlockRolling() throws Exception {
|
||||
HTable table = null;
|
||||
HTable table2 = null;
|
||||
|
||||
// When the hbase:meta table can be opened, the region servers are running
|
||||
new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME);
|
||||
try {
|
||||
String tableName = getName();
|
||||
table = createTestTable(tableName);
|
||||
String tableName2 = tableName + "1";
|
||||
table2 = createTestTable(tableName2);
|
||||
|
||||
String tableName = getName();
|
||||
HTable table = createTestTable(tableName);
|
||||
String tableName2 = tableName + "1";
|
||||
HTable table2 = createTestTable(tableName2);
|
||||
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||
this.log = server.getWAL();
|
||||
FSHLog fshLog = (FSHLog)log;
|
||||
HRegion region = server.getOnlineRegions(table2.getName()).get(0);
|
||||
Store s = region.getStore(HConstants.CATALOG_FAMILY);
|
||||
|
||||
server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName));
|
||||
this.log = server.getWAL();
|
||||
FSHLog fshLog = (FSHLog)log;
|
||||
HRegion region = server.getOnlineRegions(table2.getName()).get(0);
|
||||
Store s = region.getStore(HConstants.CATALOG_FAMILY);
|
||||
//have to flush namespace to ensure it doesn't affect wall tests
|
||||
admin.flush(TableName.NAMESPACE_TABLE_NAME.getName());
|
||||
|
||||
//have to flush namespace to ensure it doesn't affect wall tests
|
||||
admin.flush(TableName.NAMESPACE_TABLE_NAME.getName());
|
||||
// Put some stuff into table2, to make sure we have some files to compact.
|
||||
for (int i = 1; i <= 2; ++i) {
|
||||
doPut(table2, i);
|
||||
admin.flush(table2.getTableName());
|
||||
}
|
||||
doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
|
||||
assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles());
|
||||
assertEquals(2, s.getStorefilesCount());
|
||||
|
||||
// Put some stuff into table2, to make sure we have some files to compact.
|
||||
for (int i = 1; i <= 2; ++i) {
|
||||
doPut(table2, i);
|
||||
// Roll the log and compact table2, to have compaction record in the 2nd WAL.
|
||||
fshLog.rollWriter();
|
||||
assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
|
||||
admin.flush(table2.getTableName());
|
||||
region.compactStores();
|
||||
// Wait for compaction in case if flush triggered it before us.
|
||||
Assert.assertNotNull(s);
|
||||
for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
|
||||
Threads.sleepWithoutInterrupt(200);
|
||||
}
|
||||
assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
|
||||
|
||||
// Write some value to the table so the WAL cannot be deleted until table is flushed.
|
||||
doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
|
||||
fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
|
||||
assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
|
||||
|
||||
// Flush table to make latest WAL obsolete; write another record, and roll again.
|
||||
admin.flush(table.getTableName());
|
||||
doPut(table, 1);
|
||||
fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
|
||||
assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles());
|
||||
} finally {
|
||||
if (t != null) t.close();
|
||||
if (table != null) table.close();
|
||||
if (table2 != null) table2.close();
|
||||
}
|
||||
doPut(table2, 3); // don't flush yet, or compaction might trigger before we roll WAL
|
||||
assertEquals("Should have no WAL after initial writes", 0, fshLog.getNumRolledLogFiles());
|
||||
assertEquals(2, s.getStorefilesCount());
|
||||
|
||||
// Roll the log and compact table2, to have compaction record in the 2nd WAL.
|
||||
fshLog.rollWriter();
|
||||
assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
|
||||
admin.flush(table2.getTableName());
|
||||
region.compactStores();
|
||||
// Wait for compaction in case if flush triggered it before us.
|
||||
Assert.assertNotNull(s);
|
||||
for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) {
|
||||
Threads.sleepWithoutInterrupt(200);
|
||||
}
|
||||
assertEquals("Compaction didn't happen", 1, s.getStorefilesCount());
|
||||
|
||||
// Write some value to the table so the WAL cannot be deleted until table is flushed.
|
||||
doPut(table, 0); // Now 2nd WAL will have compaction record for table2 and put for table.
|
||||
fshLog.rollWriter(); // 1st WAL deleted, 2nd not deleted yet.
|
||||
assertEquals("Should have WAL; one table is not flushed", 1, fshLog.getNumRolledLogFiles());
|
||||
|
||||
// Flush table to make latest WAL obsolete; write another record, and roll again.
|
||||
admin.flush(table.getTableName());
|
||||
doPut(table, 1);
|
||||
fshLog.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
|
||||
assertEquals("Should have 1 WALs at the end", 1, fshLog.getNumRolledLogFiles());
|
||||
|
||||
table.close();
|
||||
table2.close();
|
||||
}
|
||||
|
||||
private void doPut(HTable table, int i) throws IOException {
|
||||
|
|
|
@ -969,16 +969,19 @@ public class TestWALReplay {
|
|||
}
|
||||
}
|
||||
|
||||
private void addRegionEdits (final byte [] rowName, final byte [] family,
|
||||
static List<Put> addRegionEdits (final byte [] rowName, final byte [] family,
|
||||
final int count, EnvironmentEdge ee, final HRegion r,
|
||||
final String qualifierPrefix)
|
||||
throws IOException {
|
||||
List<Put> puts = new ArrayList<Put>();
|
||||
for (int j = 0; j < count; j++) {
|
||||
byte[] qualifier = Bytes.toBytes(qualifierPrefix + Integer.toString(j));
|
||||
Put p = new Put(rowName);
|
||||
p.add(family, qualifier, ee.currentTimeMillis(), rowName);
|
||||
r.put(p);
|
||||
puts.add(p);
|
||||
}
|
||||
return puts;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1031,6 +1034,4 @@ public class TestWALReplay {
|
|||
htd.addFamily(c);
|
||||
return htd;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue