HBASE-7329 remove flush-related records from WAL and make locking more granular (Sergey)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1436632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e5ef0c2a04
commit
644cf0f636
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* A simple barrier that can be used by classes that need to wait for some operations to
|
||||
* finish before stopping/closing/etc. forever.
|
||||
*/
|
||||
public class DrainBarrier {
|
||||
/**
|
||||
* Contains the number of outstanding operations, as well as flags.
|
||||
* Initially, the number of operations is 1. Each beginOp increments, and endOp decrements it.
|
||||
* beginOp does not proceed when it sees the draining flag. When stop is called, it atomically
|
||||
* decrements the number of operations (the initial 1) and sets the draining flag. If stop did
|
||||
* the decrement to zero, that means there are no more operations outstanding, so stop is done.
|
||||
* Otherwise, stop blocks, and the endOp that decrements the count to 0 unblocks it.
|
||||
*/
|
||||
private final AtomicLong valueAndFlags = new AtomicLong(inc(0));
|
||||
private final static long DRAINING_FLAG = 0x1;
|
||||
private final static int FLAG_BIT_COUNT = 1;
|
||||
|
||||
/**
|
||||
* Tries to start an operation.
|
||||
* @return false iff the stop is in progress, and the operation cannot be started.
|
||||
*/
|
||||
public boolean beginOp() {
|
||||
long oldValAndFlags;
|
||||
do {
|
||||
oldValAndFlags = valueAndFlags.get();
|
||||
if (isDraining(oldValAndFlags)) return false;
|
||||
} while (!valueAndFlags.compareAndSet(oldValAndFlags, inc(oldValAndFlags)));
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Ends the operation. Unblocks the blocked caller of stop, if necessary.
|
||||
*/
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
|
||||
justification="First, we do change the state before notify, 2nd, it doesn't even matter")
|
||||
public void endOp() {
|
||||
long oldValAndFlags;
|
||||
do {
|
||||
oldValAndFlags = valueAndFlags.get();
|
||||
long unacceptableCount = isDraining(oldValAndFlags) ? 0 : 1;
|
||||
if (getValue(oldValAndFlags) == unacceptableCount) {
|
||||
throw new AssertionError("endOp called without corresponding beginOp call ("
|
||||
+ "the current count is " + unacceptableCount + ")");
|
||||
}
|
||||
} while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags)));
|
||||
if (getValue(oldValAndFlags) == 1) {
|
||||
synchronized (this) { this.notifyAll(); }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks new operations from starting, waits for the current ones to drain.
|
||||
* If someone already called it, returns immediately, which is currently unavoidable as
|
||||
* most of the users stop and close things right and left, and hope for the best.
|
||||
* stopAndWaitForOpsOnce asserts instead.
|
||||
* @throws InterruptedException the wait for operations has been interrupted.
|
||||
*/
|
||||
public void stopAndDrainOps() throws InterruptedException {
|
||||
stopAndDrainOps(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks new operations from starting, waits for the current ones to drain.
|
||||
* Can only be called once.
|
||||
* @throws InterruptedException the wait for operations has been interrupted.
|
||||
*/
|
||||
public void stopAndDrainOpsOnce() throws InterruptedException {
|
||||
stopAndDrainOps(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ignoreRepeatedCalls If this is true and somebody already called stop, this method
|
||||
* will return immediately if true; if this is false and somebody
|
||||
* already called stop, it will assert.
|
||||
*/
|
||||
// Justification for warnings - wait is not unconditional, and contrary to what WA_NOT_IN_LOOP
|
||||
// description says we are not waiting on multiple conditions.
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings({"UW_UNCOND_WAIT", "WA_NOT_IN_LOOP"})
|
||||
private void stopAndDrainOps(boolean ignoreRepeatedCalls) throws InterruptedException {
|
||||
long oldValAndFlags;
|
||||
do {
|
||||
oldValAndFlags = valueAndFlags.get();
|
||||
if (isDraining(oldValAndFlags)) {
|
||||
if (ignoreRepeatedCalls) return;
|
||||
throw new AssertionError("stopAndWaitForOpsOnce called more than once");
|
||||
}
|
||||
} while (!valueAndFlags.compareAndSet(oldValAndFlags, dec(oldValAndFlags) | DRAINING_FLAG));
|
||||
if (getValue(oldValAndFlags) == 1) return; // There were no operations outstanding.
|
||||
synchronized (this) { this.wait(); }
|
||||
}
|
||||
|
||||
// Helper methods.
|
||||
private static final boolean isDraining(long valueAndFlags) {
|
||||
return (valueAndFlags & DRAINING_FLAG) == DRAINING_FLAG;
|
||||
}
|
||||
|
||||
private static final long getValue(long valueAndFlags) {
|
||||
return valueAndFlags >> FLAG_BIT_COUNT;
|
||||
}
|
||||
|
||||
private static final long inc(long valueAndFlags) {
|
||||
return valueAndFlags + (1 << FLAG_BIT_COUNT); // Not checking for overflow.
|
||||
}
|
||||
|
||||
private static final long dec(long valueAndFlags) {
|
||||
return valueAndFlags - (1 << FLAG_BIT_COUNT); // Negative overflow checked outside.
|
||||
}
|
||||
}
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestDrainBarrier {
|
||||
|
||||
@Test
|
||||
public void testBeginEndStopWork() throws Exception {
|
||||
DrainBarrier barrier = new DrainBarrier();
|
||||
assertTrue(barrier.beginOp());
|
||||
assertTrue(barrier.beginOp());
|
||||
barrier.endOp();
|
||||
barrier.endOp();
|
||||
barrier.stopAndDrainOps();
|
||||
assertFalse(barrier.beginOp());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnmatchedEndAssert() throws Exception {
|
||||
DrainBarrier barrier = new DrainBarrier();
|
||||
try {
|
||||
barrier.endOp();
|
||||
fail("Should have asserted");
|
||||
} catch (AssertionError e) {
|
||||
}
|
||||
|
||||
barrier.beginOp();
|
||||
barrier.beginOp();
|
||||
barrier.endOp();
|
||||
barrier.endOp();
|
||||
try {
|
||||
barrier.endOp();
|
||||
fail("Should have asserted");
|
||||
} catch (AssertionError e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopWithoutOpsDoesntBlock() throws Exception {
|
||||
DrainBarrier barrier = new DrainBarrier();
|
||||
barrier.stopAndDrainOpsOnce();
|
||||
|
||||
barrier = new DrainBarrier();
|
||||
barrier.beginOp();
|
||||
barrier.endOp();
|
||||
barrier.stopAndDrainOpsOnce();
|
||||
}
|
||||
|
||||
@Test
|
||||
/** This test tests blocking and can have false positives in very bad timing cases. */
|
||||
public void testStopIsBlockedByOps() throws Exception {
|
||||
final DrainBarrier barrier = new DrainBarrier();
|
||||
barrier.beginOp();
|
||||
barrier.beginOp();
|
||||
barrier.beginOp();
|
||||
barrier.endOp();
|
||||
|
||||
Thread stoppingThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
barrier.stopAndDrainOpsOnce();
|
||||
} catch (InterruptedException e) {
|
||||
fail("Should not have happened");
|
||||
}
|
||||
}
|
||||
});
|
||||
stoppingThread.start();
|
||||
|
||||
// First "end" should not unblock the thread, but the second should.
|
||||
barrier.endOp();
|
||||
stoppingThread.join(1000);
|
||||
assertTrue(stoppingThread.isAlive());
|
||||
barrier.endOp();
|
||||
stoppingThread.join(30000); // When not broken, will be a very fast wait; set safe value.
|
||||
assertFalse(stoppingThread.isAlive());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleStopOnceAssert() throws Exception {
|
||||
DrainBarrier barrier = new DrainBarrier();
|
||||
barrier.stopAndDrainOpsOnce();
|
||||
try {
|
||||
barrier.stopAndDrainOpsOnce();
|
||||
fail("Should have asserted");
|
||||
} catch (AssertionError e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleSloppyStopsHaveNoEffect() throws Exception {
|
||||
DrainBarrier barrier = new DrainBarrier();
|
||||
barrier.stopAndDrainOps();
|
||||
barrier.stopAndDrainOps();
|
||||
}
|
||||
}
|
|
@ -126,7 +126,7 @@ public class WALPlayer extends Configured implements Tool {
|
|||
Delete del = null;
|
||||
KeyValue lastKV = null;
|
||||
for (KeyValue kv : value.getKeyValues()) {
|
||||
// filtering HLog meta entries, see HLog.completeCacheFlushLogEdit
|
||||
// filtering HLog meta entries
|
||||
if (HLogUtil.isMetaFamily(kv.getFamily())) continue;
|
||||
|
||||
// A WALEdit may contain multiple operations (HBASE-3584) and/or
|
||||
|
|
|
@ -1553,17 +1553,26 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
long flushsize = this.memstoreSize.get();
|
||||
status.setStatus("Preparing to flush by snapshotting stores");
|
||||
List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());
|
||||
long completeSeqId = -1L;
|
||||
long flushSeqId = -1L;
|
||||
try {
|
||||
// Record the mvcc for all transactions in progress.
|
||||
w = mvcc.beginMemstoreInsert();
|
||||
mvcc.advanceMemstore(w);
|
||||
|
||||
sequenceId = (wal == null)? myseqid:
|
||||
wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
||||
completeSeqId = this.getCompleteCacheFlushSequenceId(sequenceId);
|
||||
if (wal != null) {
|
||||
Long startSeqId = wal.startCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
||||
if (startSeqId == null) {
|
||||
status.setStatus("Flush will not be started for [" + this.regionInfo.getEncodedName()
|
||||
+ "] - WAL is going away");
|
||||
return false;
|
||||
}
|
||||
flushSeqId = startSeqId.longValue();
|
||||
} else {
|
||||
flushSeqId = myseqid;
|
||||
}
|
||||
|
||||
for (Store s : stores.values()) {
|
||||
storeFlushers.add(s.getStoreFlusher(completeSeqId));
|
||||
storeFlushers.add(s.getStoreFlusher(flushSeqId));
|
||||
}
|
||||
|
||||
// prepare flush (take a snapshot)
|
||||
|
@ -1632,22 +1641,14 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
throw dse;
|
||||
}
|
||||
|
||||
// If we get to here, the HStores have been written. If we get an
|
||||
// error in completeCacheFlush it will release the lock it is holding
|
||||
|
||||
// B. Write a FLUSHCACHE-COMPLETE message to the log.
|
||||
// This tells future readers that the HStores were emitted correctly,
|
||||
// and that all updates to the log for this regionName that have lower
|
||||
// log-sequence-ids can be safely ignored.
|
||||
// If we get to here, the HStores have been written.
|
||||
if (wal != null) {
|
||||
wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes(),
|
||||
regionInfo.getTableName(), completeSeqId,
|
||||
this.getRegionInfo().isMetaRegion());
|
||||
wal.completeCacheFlush(this.regionInfo.getEncodedNameAsBytes());
|
||||
}
|
||||
|
||||
// Update the last flushed sequence id for region
|
||||
if (this.rsServices != null) {
|
||||
completeSequenceId = completeSeqId;
|
||||
completeSequenceId = flushSeqId;
|
||||
}
|
||||
|
||||
// C. Finally notify anyone waiting on memstore to clear:
|
||||
|
@ -1672,18 +1673,6 @@ public class HRegion implements HeapSize { // , Writable{
|
|||
return compactionRequested;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the sequence number to be associated with this cache flush. Used by
|
||||
* TransactionalRegion to not complete pending transactions.
|
||||
*
|
||||
*
|
||||
* @param currentSequenceId
|
||||
* @return sequence id to complete the cache flush with
|
||||
*/
|
||||
protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
|
||||
return currentSequenceId;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// get() methods for client use.
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -24,8 +24,10 @@ import java.io.OutputStream;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
|||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
import org.apache.hadoop.hbase.util.DrainBarrier;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
|
@ -132,23 +135,45 @@ class FSHLog implements HLog, Syncable {
|
|||
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
|
||||
final static Object [] NO_ARGS = new Object []{};
|
||||
|
||||
/*
|
||||
/** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */
|
||||
private DrainBarrier closeBarrier = new DrainBarrier();
|
||||
|
||||
/**
|
||||
* Current log file.
|
||||
*/
|
||||
Writer writer;
|
||||
|
||||
/*
|
||||
/**
|
||||
* Map of all log files but the current one.
|
||||
*/
|
||||
final SortedMap<Long, Path> outputfiles =
|
||||
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
|
||||
|
||||
/*
|
||||
* Map of encoded region names to their most recent sequence/edit id in their
|
||||
* memstore.
|
||||
|
||||
/**
|
||||
* This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums,
|
||||
* with the exception of append's putIfAbsent into oldestUnflushedSeqNums.
|
||||
* We only use these to find out the low bound seqNum, or to find regions with old seqNums to
|
||||
* force flush them, so we don't care about these numbers messing with anything. */
|
||||
private final Object oldestSeqNumsLock = new Object();
|
||||
|
||||
/**
|
||||
* This lock makes sure only one log roll runs at the same time. Should not be taken while
|
||||
* any other lock is held. We don't just use synchronized because that results in bogus and
|
||||
* tedious findbugs warning when it thinks synchronized controls writer thread safety */
|
||||
private final Object rollWriterLock = new Object();
|
||||
|
||||
/**
|
||||
* Map of encoded region names to their most recent sequence/edit id in their memstore.
|
||||
*/
|
||||
private final ConcurrentSkipListMap<byte [], Long> lastSeqWritten =
|
||||
private final ConcurrentSkipListMap<byte [], Long> oldestUnflushedSeqNums =
|
||||
new ConcurrentSkipListMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
|
||||
/**
|
||||
* Map of encoded region names to their most recent sequence/edit id in their memstore;
|
||||
* contains the regions that are currently flushing. That way we can store two numbers for
|
||||
* flushing and non-flushing (oldestUnflushedSeqNums) memstore for the same region.
|
||||
*/
|
||||
private final Map<byte[], Long> oldestFlushingSeqNums = new HashMap<byte[], Long>();
|
||||
|
||||
private volatile boolean closed = false;
|
||||
|
||||
|
@ -178,10 +203,6 @@ class FSHLog implements HLog, Syncable {
|
|||
// of the default Hdfs block size.
|
||||
private final long logrollsize;
|
||||
|
||||
// This lock prevents starting a log roll during a cache flush.
|
||||
// synchronized is insufficient because a cache flush spans two method calls.
|
||||
private final Lock cacheFlushLock = new ReentrantLock();
|
||||
|
||||
// We synchronize on updateLock to prevent updates and to prevent a log roll
|
||||
// during an update
|
||||
// locked during appends
|
||||
|
@ -472,88 +493,77 @@ class FSHLog implements HLog, Syncable {
|
|||
@Override
|
||||
public byte [][] rollWriter(boolean force)
|
||||
throws FailedLogCloseException, IOException {
|
||||
// Return if nothing to flush.
|
||||
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
||||
return null;
|
||||
}
|
||||
byte [][] regionsToFlush = null;
|
||||
this.cacheFlushLock.lock();
|
||||
try {
|
||||
this.logRollRunning = true;
|
||||
if (closed) {
|
||||
LOG.debug("HLog closed. Skipping rolling of writer");
|
||||
return regionsToFlush;
|
||||
synchronized (rollWriterLock) {
|
||||
// Return if nothing to flush.
|
||||
if (!force && this.writer != null && this.numEntries.get() <= 0) {
|
||||
return null;
|
||||
}
|
||||
// Do all the preparation outside of the updateLock to block
|
||||
// as less as possible the incoming writes
|
||||
long currentFilenum = this.filenum;
|
||||
Path oldPath = null;
|
||||
if (currentFilenum > 0) {
|
||||
//computeFilename will take care of meta hlog filename
|
||||
oldPath = computeFilename(currentFilenum);
|
||||
}
|
||||
this.filenum = System.currentTimeMillis();
|
||||
Path newPath = computeFilename();
|
||||
|
||||
// Tell our listeners that a new log is about to be created
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.preLogRoll(oldPath, newPath);
|
||||
}
|
||||
}
|
||||
FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
|
||||
// Can we get at the dfsclient outputstream? If an instance of
|
||||
// SFLW, it'll have done the necessary reflection to get at the
|
||||
// protected field name.
|
||||
FSDataOutputStream nextHdfsOut = null;
|
||||
if (nextWriter instanceof SequenceFileLogWriter) {
|
||||
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
|
||||
}
|
||||
|
||||
synchronized (updateLock) {
|
||||
// Clean up current writer.
|
||||
Path oldFile = cleanupCurrentWriter(currentFilenum);
|
||||
this.writer = nextWriter;
|
||||
this.hdfs_out = nextHdfsOut;
|
||||
|
||||
LOG.info((oldFile != null?
|
||||
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
|
||||
this.numEntries.get() +
|
||||
", filesize=" +
|
||||
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
|
||||
" for " + FSUtils.getPath(newPath));
|
||||
this.numEntries.set(0);
|
||||
}
|
||||
// Tell our listeners that a new log was created
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.postLogRoll(oldPath, newPath);
|
||||
}
|
||||
}
|
||||
|
||||
// Can we delete any of the old log files?
|
||||
if (this.outputfiles.size() > 0) {
|
||||
if (this.lastSeqWritten.isEmpty()) {
|
||||
LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
|
||||
// If so, then no new writes have come in since all regions were
|
||||
// flushed (and removed from the lastSeqWritten map). Means can
|
||||
// remove all but currently open log file.
|
||||
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
|
||||
archiveLogFile(e.getValue(), e.getKey());
|
||||
}
|
||||
this.outputfiles.clear();
|
||||
} else {
|
||||
regionsToFlush = cleanOldLogs();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
byte [][] regionsToFlush = null;
|
||||
try {
|
||||
this.logRollRunning = false;
|
||||
this.logRollRunning = true;
|
||||
boolean isClosed = closed;
|
||||
if (isClosed || !closeBarrier.beginOp()) {
|
||||
LOG.debug("HLog " + (isClosed ? "closed" : "closing") + ". Skipping rolling of writer");
|
||||
return regionsToFlush;
|
||||
}
|
||||
// Do all the preparation outside of the updateLock to block
|
||||
// as less as possible the incoming writes
|
||||
long currentFilenum = this.filenum;
|
||||
Path oldPath = null;
|
||||
if (currentFilenum > 0) {
|
||||
//computeFilename will take care of meta hlog filename
|
||||
oldPath = computeFilename(currentFilenum);
|
||||
}
|
||||
this.filenum = System.currentTimeMillis();
|
||||
Path newPath = computeFilename();
|
||||
|
||||
// Tell our listeners that a new log is about to be created
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.preLogRoll(oldPath, newPath);
|
||||
}
|
||||
}
|
||||
FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
|
||||
// Can we get at the dfsclient outputstream? If an instance of
|
||||
// SFLW, it'll have done the necessary reflection to get at the
|
||||
// protected field name.
|
||||
FSDataOutputStream nextHdfsOut = null;
|
||||
if (nextWriter instanceof SequenceFileLogWriter) {
|
||||
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
|
||||
}
|
||||
|
||||
Path oldFile = null;
|
||||
int oldNumEntries = 0;
|
||||
synchronized (updateLock) {
|
||||
// Clean up current writer.
|
||||
oldNumEntries = this.numEntries.get();
|
||||
oldFile = cleanupCurrentWriter(currentFilenum);
|
||||
this.writer = nextWriter;
|
||||
this.hdfs_out = nextHdfsOut;
|
||||
this.numEntries.set(0);
|
||||
}
|
||||
LOG.info("Rolled log" + (oldFile != null ? " for file=" + FSUtils.getPath(oldFile)
|
||||
+ ", entries=" + oldNumEntries + ", filesize=" + this.fs.getFileStatus(oldFile).getLen()
|
||||
: "" ) + "; new path=" + FSUtils.getPath(newPath));
|
||||
|
||||
// Tell our listeners that a new log was created
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.postLogRoll(oldPath, newPath);
|
||||
}
|
||||
}
|
||||
|
||||
// Can we delete any of the old log files?
|
||||
if (getNumLogFiles() > 0) {
|
||||
cleanOldLogs();
|
||||
regionsToFlush = getRegionsToForceFlush();
|
||||
}
|
||||
} finally {
|
||||
this.cacheFlushLock.unlock();
|
||||
this.logRollRunning = false;
|
||||
closeBarrier.endOp();
|
||||
}
|
||||
return regionsToFlush;
|
||||
}
|
||||
return regionsToFlush;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -581,36 +591,64 @@ class FSHLog implements HLog, Syncable {
|
|||
* encoded region names to flush.
|
||||
* @throws IOException
|
||||
*/
|
||||
private byte [][] cleanOldLogs() throws IOException {
|
||||
Long oldestOutstandingSeqNum = getOldestOutstandingSeqNum();
|
||||
private void cleanOldLogs() throws IOException {
|
||||
long oldestOutstandingSeqNum = Long.MAX_VALUE;
|
||||
synchronized (oldestSeqNumsLock) {
|
||||
Long oldestFlushing = (oldestFlushingSeqNums.size() > 0)
|
||||
? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE;
|
||||
Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0)
|
||||
? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE;
|
||||
oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed);
|
||||
}
|
||||
|
||||
// Get the set of all log files whose last sequence number is smaller than
|
||||
// the oldest edit's sequence number.
|
||||
TreeSet<Long> sequenceNumbers = new TreeSet<Long>(this.outputfiles.headMap(
|
||||
oldestOutstandingSeqNum).keySet());
|
||||
// Now remove old log files (if any)
|
||||
int logsToRemove = sequenceNumbers.size();
|
||||
if (logsToRemove > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// Find associated region; helps debugging.
|
||||
byte [] oldestRegion = getOldestRegion(oldestOutstandingSeqNum);
|
||||
LOG.debug("Found " + logsToRemove + " hlogs to remove" +
|
||||
if (LOG.isDebugEnabled()) {
|
||||
if (sequenceNumbers.size() > 0) {
|
||||
LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" +
|
||||
" out of total " + this.outputfiles.size() + ";" +
|
||||
" oldest outstanding sequenceid is " + oldestOutstandingSeqNum +
|
||||
" from region " + Bytes.toStringBinary(oldestRegion));
|
||||
}
|
||||
for (Long seq : sequenceNumbers) {
|
||||
archiveLogFile(this.outputfiles.remove(seq), seq);
|
||||
" oldest outstanding sequenceid is " + oldestOutstandingSeqNum);
|
||||
}
|
||||
}
|
||||
for (Long seq : sequenceNumbers) {
|
||||
archiveLogFile(this.outputfiles.remove(seq), seq);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return regions that have edits that are equal or less than a certain sequence number.
|
||||
* Static due to some old unit test.
|
||||
* @param walSeqNum The sequence number to compare with.
|
||||
* @param regionsToSeqNums Encoded region names to sequence ids
|
||||
* @return All regions whose seqNum <= walSeqNum. Null if no regions found.
|
||||
*/
|
||||
static byte[][] findMemstoresWithEditsEqualOrOlderThan(
|
||||
final long walSeqNum, final Map<byte[], Long> regionsToSeqNums) {
|
||||
List<byte[]> regions = null;
|
||||
for (Map.Entry<byte[], Long> e : regionsToSeqNums.entrySet()) {
|
||||
if (e.getValue().longValue() <= walSeqNum) {
|
||||
if (regions == null) regions = new ArrayList<byte[]>();
|
||||
regions.add(e.getKey());
|
||||
}
|
||||
}
|
||||
return regions == null ? null : regions
|
||||
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
|
||||
}
|
||||
|
||||
private byte[][] getRegionsToForceFlush() throws IOException {
|
||||
// If too many log files, figure which regions we need to flush.
|
||||
// Array is an array of encoded region names.
|
||||
byte [][] regions = null;
|
||||
int logCount = this.outputfiles.size();
|
||||
int logCount = getNumLogFiles();
|
||||
if (logCount > this.maxLogs && logCount > 0) {
|
||||
// This is an array of encoded region names.
|
||||
regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
|
||||
this.lastSeqWritten);
|
||||
synchronized (oldestSeqNumsLock) {
|
||||
regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(),
|
||||
this.oldestUnflushedSeqNums);
|
||||
}
|
||||
if (regions != null) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < regions.length; i++) {
|
||||
|
@ -625,29 +663,6 @@ class FSHLog implements HLog, Syncable {
|
|||
return regions;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return Logs older than this id are safe to remove.
|
||||
*/
|
||||
private Long getOldestOutstandingSeqNum() {
|
||||
return Collections.min(this.lastSeqWritten.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* @param oldestOutstandingSeqNum
|
||||
* @return (Encoded) name of oldest outstanding region.
|
||||
*/
|
||||
private byte [] getOldestRegion(final Long oldestOutstandingSeqNum) {
|
||||
byte [] oldestRegion = null;
|
||||
for (Map.Entry<byte [], Long> e: this.lastSeqWritten.entrySet()) {
|
||||
if (e.getValue().longValue() == oldestOutstandingSeqNum.longValue()) {
|
||||
// Key is encoded region name.
|
||||
oldestRegion = e.getKey();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return oldestRegion;
|
||||
}
|
||||
|
||||
/*
|
||||
* Cleans up current writer closing and adding to outputfiles.
|
||||
* Presumes we're operating inside an updateLock scope.
|
||||
|
@ -780,33 +795,39 @@ class FSHLog implements HLog, Syncable {
|
|||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
logSyncerThread.close();
|
||||
// Make sure we synced everything
|
||||
logSyncerThread.join(this.optionalFlushInterval*2);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Exception while waiting for syncer thread to die", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
try {
|
||||
// Prevent all further flushing and rolling.
|
||||
closeBarrier.stopAndDrainOps();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Exception while waiting for cache flushes and log rolls", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
cacheFlushLock.lock();
|
||||
try {
|
||||
// Tell our listeners that the log is closing
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.logCloseRequested();
|
||||
}
|
||||
// Tell our listeners that the log is closing
|
||||
if (!this.listeners.isEmpty()) {
|
||||
for (WALActionsListener i : this.listeners) {
|
||||
i.logCloseRequested();
|
||||
}
|
||||
synchronized (updateLock) {
|
||||
this.closed = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("closing hlog writer in " + this.dir.toString());
|
||||
}
|
||||
if (this.writer != null) {
|
||||
this.writer.close();
|
||||
}
|
||||
}
|
||||
synchronized (updateLock) {
|
||||
this.closed = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("closing hlog writer in " + this.dir.toString());
|
||||
}
|
||||
if (this.writer != null) {
|
||||
this.writer.close();
|
||||
}
|
||||
} finally {
|
||||
cacheFlushLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -838,7 +859,7 @@ class FSHLog implements HLog, Syncable {
|
|||
// memstore). When the cache is flushed, the entry for the
|
||||
// region being flushed is removed if the sequence number of the flush
|
||||
// is greater than or equal to the value in lastSeqWritten.
|
||||
this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
|
||||
this.oldestUnflushedSeqNums.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
|
||||
Long.valueOf(seqNum));
|
||||
doWrite(regionInfo, logKey, logEdit, htd);
|
||||
txid = this.unflushedEntries.incrementAndGet();
|
||||
|
@ -910,7 +931,7 @@ class FSHLog implements HLog, Syncable {
|
|||
// Use encoded name. Its shorter, guaranteed unique and a subset of
|
||||
// actual name.
|
||||
byte [] encodedRegionName = info.getEncodedNameAsBytes();
|
||||
this.lastSeqWritten.putIfAbsent(encodedRegionName, seqNum);
|
||||
this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
|
||||
HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId);
|
||||
doWrite(info, logKey, edits, htd);
|
||||
this.numEntries.incrementAndGet();
|
||||
|
@ -1042,7 +1063,11 @@ class FSHLog implements HLog, Syncable {
|
|||
Writer tempWriter;
|
||||
synchronized (this.updateLock) {
|
||||
if (this.closed) return;
|
||||
tempWriter = this.writer; // guaranteed non-null
|
||||
// Guaranteed non-null.
|
||||
// Note that parallel sync can close tempWriter.
|
||||
// The current method of dealing with this is to catch exceptions.
|
||||
// See HBASE-4387, HBASE-5623, HBASE-7329.
|
||||
tempWriter = this.writer;
|
||||
}
|
||||
// if the transaction that we are interested in is already
|
||||
// synced, then return immediately.
|
||||
|
@ -1078,9 +1103,11 @@ class FSHLog implements HLog, Syncable {
|
|||
}
|
||||
try {
|
||||
tempWriter.sync();
|
||||
} catch(IOException io) {
|
||||
} catch(IOException ex) {
|
||||
synchronized (this.updateLock) {
|
||||
// HBASE-4387, HBASE-5623, retry with updateLock held
|
||||
// TODO: we don't actually need to do it for concurrent close - what is the point
|
||||
// of syncing new unrelated writer? Keep behavior for now.
|
||||
tempWriter = this.writer;
|
||||
tempWriter.sync();
|
||||
}
|
||||
|
@ -1088,6 +1115,9 @@ class FSHLog implements HLog, Syncable {
|
|||
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
|
||||
|
||||
this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
|
||||
// TODO: preserving the old behavior for now, but this check is strange. It's not
|
||||
// protected by any locks here, so for all we know rolling locks might start
|
||||
// as soon as we enter the "if". Is this best-effort optimization check?
|
||||
if (!this.logRollRunning) {
|
||||
checkLowReplication();
|
||||
try {
|
||||
|
@ -1250,107 +1280,61 @@ class FSHLog implements HLog, Syncable {
|
|||
return outputfiles.size();
|
||||
}
|
||||
|
||||
private byte[] getSnapshotName(byte[] encodedRegionName) {
|
||||
byte snp[] = new byte[encodedRegionName.length + 3];
|
||||
// an encoded region name has only hex digits. s, n or p are not hex
|
||||
// and therefore snapshot-names will never collide with
|
||||
// encoded-region-names
|
||||
snp[0] = 's'; snp[1] = 'n'; snp[2] = 'p';
|
||||
for (int i = 0; i < encodedRegionName.length; i++) {
|
||||
snp[i+3] = encodedRegionName[i];
|
||||
}
|
||||
return snp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long startCacheFlush(final byte[] encodedRegionName) {
|
||||
this.cacheFlushLock.lock();
|
||||
Long seq = this.lastSeqWritten.remove(encodedRegionName);
|
||||
// seq is the lsn of the oldest edit associated with this region. If a
|
||||
// snapshot already exists - because the last flush failed - then seq will
|
||||
// be the lsn of the oldest edit in the snapshot
|
||||
if (seq != null) {
|
||||
// keeping the earliest sequence number of the snapshot in
|
||||
// lastSeqWritten maintains the correctness of
|
||||
// getOldestOutstandingSeqNum(). But it doesn't matter really because
|
||||
// everything is being done inside of cacheFlush lock.
|
||||
Long oldseq =
|
||||
lastSeqWritten.put(getSnapshotName(encodedRegionName), seq);
|
||||
if (oldseq != null) {
|
||||
LOG.error("Logic Error Snapshot seq id from earlier flush still" +
|
||||
" present! for region " + Bytes.toString(encodedRegionName) +
|
||||
" overwritten oldseq=" + oldseq + "with new seq=" + seq);
|
||||
Runtime.getRuntime().halt(1);
|
||||
public Long startCacheFlush(final byte[] encodedRegionName) {
|
||||
Long oldRegionSeqNum = null;
|
||||
if (!closeBarrier.beginOp()) {
|
||||
return null;
|
||||
}
|
||||
synchronized (oldestSeqNumsLock) {
|
||||
oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName);
|
||||
if (oldRegionSeqNum != null) {
|
||||
Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum);
|
||||
assert oldValue == null : "Flushing map not cleaned up for "
|
||||
+ Bytes.toString(encodedRegionName);
|
||||
}
|
||||
}
|
||||
if (oldRegionSeqNum == null) {
|
||||
// TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either
|
||||
// the region is already flushing (which would make this call invalid), or there
|
||||
// were no appends after last flush, so why are we starting flush? Maybe we should
|
||||
// assert not null, and switch to "long" everywhere. Less rigorous, but safer,
|
||||
// alternative is telling the caller to stop. For now preserve old logic.
|
||||
LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: ["
|
||||
+ Bytes.toString(encodedRegionName) + "]");
|
||||
}
|
||||
return obtainSeqNum();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completeCacheFlush(final byte [] encodedRegionName,
|
||||
final byte [] tableName, final long logSeqId, final boolean isMetaRegion)
|
||||
throws IOException {
|
||||
try {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
long txid = 0;
|
||||
synchronized (updateLock) {
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
WALEdit edit = completeCacheFlushLogEdit();
|
||||
HLogKey key = makeKey(encodedRegionName, tableName, logSeqId,
|
||||
System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID);
|
||||
logSyncerThread.append(new Entry(key, edit));
|
||||
txid = this.unflushedEntries.incrementAndGet();
|
||||
long took = EnvironmentEdgeManager.currentTimeMillis() - now;
|
||||
long len = 0;
|
||||
for (KeyValue kv : edit.getKeyValues()) {
|
||||
len += kv.getLength();
|
||||
}
|
||||
this.metrics.finishAppend(took, len);
|
||||
this.numEntries.incrementAndGet();
|
||||
}
|
||||
// sync txn to file system
|
||||
this.sync(txid);
|
||||
|
||||
} finally {
|
||||
// updateLock not needed for removing snapshot's entry
|
||||
// Cleaning up of lastSeqWritten is in the finally clause because we
|
||||
// don't want to confuse getOldestOutstandingSeqNum()
|
||||
this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
|
||||
this.cacheFlushLock.unlock();
|
||||
public void completeCacheFlush(final byte [] encodedRegionName)
|
||||
{
|
||||
synchronized (oldestSeqNumsLock) {
|
||||
this.oldestFlushingSeqNums.remove(encodedRegionName);
|
||||
}
|
||||
}
|
||||
|
||||
private WALEdit completeCacheFlushLogEdit() {
|
||||
KeyValue kv = new KeyValue(HLog.METAROW, HLog.METAFAMILY, null,
|
||||
System.currentTimeMillis(), HLogUtil.COMPLETE_CACHE_FLUSH);
|
||||
WALEdit e = new WALEdit();
|
||||
e.add(kv);
|
||||
return e;
|
||||
closeBarrier.endOp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abortCacheFlush(byte[] encodedRegionName) {
|
||||
Long snapshot_seq =
|
||||
this.lastSeqWritten.remove(getSnapshotName(encodedRegionName));
|
||||
if (snapshot_seq != null) {
|
||||
// updateLock not necessary because we are racing against
|
||||
// lastSeqWritten.putIfAbsent() in append() and we will always win
|
||||
// before releasing cacheFlushLock make sure that the region's entry in
|
||||
// lastSeqWritten points to the earliest edit in the region
|
||||
Long current_memstore_earliest_seq =
|
||||
this.lastSeqWritten.put(encodedRegionName, snapshot_seq);
|
||||
if (current_memstore_earliest_seq != null &&
|
||||
(current_memstore_earliest_seq.longValue() <=
|
||||
snapshot_seq.longValue())) {
|
||||
LOG.error("Logic Error region " + Bytes.toString(encodedRegionName) +
|
||||
"acquired edits out of order current memstore seq=" +
|
||||
current_memstore_earliest_seq + " snapshot seq=" + snapshot_seq);
|
||||
Runtime.getRuntime().halt(1);
|
||||
Long currentSeqNum = null, seqNumBeforeFlushStarts = null;
|
||||
synchronized (oldestSeqNumsLock) {
|
||||
seqNumBeforeFlushStarts = this.oldestFlushingSeqNums.remove(encodedRegionName);
|
||||
if (seqNumBeforeFlushStarts != null) {
|
||||
currentSeqNum =
|
||||
this.oldestUnflushedSeqNums.put(encodedRegionName, seqNumBeforeFlushStarts);
|
||||
}
|
||||
}
|
||||
this.cacheFlushLock.unlock();
|
||||
closeBarrier.endOp();
|
||||
if ((currentSeqNum != null)
|
||||
&& (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) {
|
||||
String errorStr = "Region " + Bytes.toString(encodedRegionName) +
|
||||
"acquired edits out of order current memstore seq=" + currentSeqNum
|
||||
+ ", previous oldest unflushed id=" + seqNumBeforeFlushStarts;
|
||||
LOG.error(errorStr);
|
||||
assert false : errorStr;
|
||||
Runtime.getRuntime().halt(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -162,14 +162,14 @@ public interface HLog {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* registers WALActionsListener
|
||||
*
|
||||
* @param listener
|
||||
*/
|
||||
public void registerWALActionsListener(final WALActionsListener listener);
|
||||
|
||||
/*
|
||||
/**
|
||||
* unregisters WALActionsListener
|
||||
*
|
||||
* @param listener
|
||||
|
@ -200,18 +200,10 @@ public interface HLog {
|
|||
/**
|
||||
* Roll the log writer. That is, start writing log messages to a new file.
|
||||
*
|
||||
* Because a log cannot be rolled during a cache flush, and a cache flush
|
||||
* spans two method calls, a special lock needs to be obtained so that a cache
|
||||
* flush cannot start when the log is being rolled and the log cannot be
|
||||
* rolled during a cache flush.
|
||||
*
|
||||
* <p>
|
||||
* Note that this method cannot be synchronized because it is possible that
|
||||
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could
|
||||
* start which would obtain the lock on this but block on obtaining the
|
||||
* cacheFlushLock and then completeCacheFlush could be called which would wait
|
||||
* for the lock on this and consequently never release the cacheFlushLock
|
||||
*
|
||||
* The implementation is synchronized in order to make sure there's one rollWriter
|
||||
* running at any given time.
|
||||
*
|
||||
* @return If lots of logs, flush the returned regions so next time through we
|
||||
* can clean logs. Returns null if nothing to flush. Names are actual
|
||||
* region names as returned by {@link HRegionInfo#getEncodedName()}
|
||||
|
@ -223,17 +215,9 @@ public interface HLog {
|
|||
/**
|
||||
* Roll the log writer. That is, start writing log messages to a new file.
|
||||
*
|
||||
* Because a log cannot be rolled during a cache flush, and a cache flush
|
||||
* spans two method calls, a special lock needs to be obtained so that a cache
|
||||
* flush cannot start when the log is being rolled and the log cannot be
|
||||
* rolled during a cache flush.
|
||||
*
|
||||
* <p>
|
||||
* Note that this method cannot be synchronized because it is possible that
|
||||
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could
|
||||
* start which would obtain the lock on this but block on obtaining the
|
||||
* cacheFlushLock and then completeCacheFlush could be called which would wait
|
||||
* for the lock on this and consequently never release the cacheFlushLock
|
||||
* The implementation is synchronized in order to make sure there's one rollWriter
|
||||
* running at any given time.
|
||||
*
|
||||
* @param force
|
||||
* If true, force creation of a new writer even if no entries have
|
||||
|
@ -337,53 +321,33 @@ public interface HLog {
|
|||
public long obtainSeqNum();
|
||||
|
||||
/**
|
||||
* By acquiring a log sequence ID, we can allow log messages to continue while
|
||||
* we flush the cache.
|
||||
*
|
||||
* Acquire a lock so that we do not roll the log between the start and
|
||||
* completion of a cache-flush. Otherwise the log-seq-id for the flush will
|
||||
* not appear in the correct logfile.
|
||||
*
|
||||
* Ensuring that flushes and log-rolls don't happen concurrently also allows
|
||||
* us to temporarily put a log-seq-number in lastSeqWritten against the region
|
||||
* being flushed that might not be the earliest in-memory log-seq-number for
|
||||
* that region. By the time the flush is completed or aborted and before the
|
||||
* cacheFlushLock is released it is ensured that lastSeqWritten again has the
|
||||
* oldest in-memory edit's lsn for the region that was being flushed.
|
||||
*
|
||||
* In this method, by removing the entry in lastSeqWritten for the region
|
||||
* being flushed we ensure that the next edit inserted in this region will be
|
||||
* correctly recorded in
|
||||
* {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} The
|
||||
* lsn of the earliest in-memory lsn - which is now in the memstore snapshot -
|
||||
* is saved temporarily in the lastSeqWritten map while the flush is active.
|
||||
*
|
||||
* @return sequence ID to pass
|
||||
* {@link #completeCacheFlush(byte[], byte[], long, boolean)} (byte[],
|
||||
* byte[], long)}
|
||||
* @see #completeCacheFlush(byte[], byte[], long, boolean)
|
||||
* @see #abortCacheFlush(byte[])
|
||||
* WAL keeps track of the sequence numbers that were not yet flushed from memstores
|
||||
* in order to be able to do cleanup. This method tells WAL that some region is about
|
||||
* to flush memstore.
|
||||
*
|
||||
* We stash the oldest seqNum for the region, and let the the next edit inserted in this
|
||||
* region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)}
|
||||
* as new oldest seqnum. In case of flush being aborted, we put the stashed value back;
|
||||
* in case of flush succeeding, the seqNum of that first edit after start becomes the
|
||||
* valid oldest seqNum for this region.
|
||||
*
|
||||
* @return current seqNum, to pass on to flushers (who will put it into the metadata of
|
||||
* the resulting file as an upper-bound seqNum for that file), or NULL if flush
|
||||
* should not be started.
|
||||
*/
|
||||
public long startCacheFlush(final byte[] encodedRegionName);
|
||||
public Long startCacheFlush(final byte[] encodedRegionName);
|
||||
|
||||
/**
|
||||
* Complete the cache flush
|
||||
*
|
||||
* Protected by cacheFlushLock
|
||||
*
|
||||
* @param encodedRegionName
|
||||
* @param tableName
|
||||
* @param logSeqId
|
||||
* @throws IOException
|
||||
* Complete the cache flush.
|
||||
* @param encodedRegionName Encoded region name.
|
||||
*/
|
||||
public void completeCacheFlush(final byte[] encodedRegionName,
|
||||
final byte[] tableName, final long logSeqId, final boolean isMetaRegion)
|
||||
throws IOException;
|
||||
public void completeCacheFlush(final byte[] encodedRegionName);
|
||||
|
||||
/**
|
||||
* Abort a cache flush. Call if the flush fails. Note that the only recovery
|
||||
* for an aborted flush currently is a restart of the regionserver so the
|
||||
* snapshot content dropped by the failure gets restored to the memstore.
|
||||
* snapshot content dropped by the failure gets restored to the memstore.v
|
||||
* @param encodedRegionName Encoded region name.
|
||||
*/
|
||||
public void abortCacheFlush(byte[] encodedRegionName);
|
||||
|
||||
|
|
|
@ -46,8 +46,6 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
public class HLogUtil {
|
||||
static final Log LOG = LogFactory.getLog(HLogUtil.class);
|
||||
|
||||
static final byte[] COMPLETE_CACHE_FLUSH = Bytes.toBytes("HBASE::CACHEFLUSH");
|
||||
|
||||
/**
|
||||
* @param family
|
||||
* @return true if the column is a meta column
|
||||
|
@ -243,32 +241,6 @@ public class HLogUtil {
|
|||
return ServerName.parseServerName(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return regions (memstores) that have edits that are equal or less than the
|
||||
* passed <code>oldestWALseqid</code>.
|
||||
*
|
||||
* @param oldestWALseqid
|
||||
* @param regionsToSeqids
|
||||
* Encoded region names to sequence ids
|
||||
* @return All regions whose seqid is < than <code>oldestWALseqid</code> (Not
|
||||
* necessarily in order). Null if no regions found.
|
||||
*/
|
||||
static byte[][] findMemstoresWithEditsEqualOrOlderThan(
|
||||
final long oldestWALseqid, final Map<byte[], Long> regionsToSeqids) {
|
||||
// This method is static so it can be unit tested the easier.
|
||||
List<byte[]> regions = null;
|
||||
for (Map.Entry<byte[], Long> e : regionsToSeqids.entrySet()) {
|
||||
if (e.getValue().longValue() <= oldestWALseqid) {
|
||||
if (regions == null)
|
||||
regions = new ArrayList<byte[]>();
|
||||
// Key is encoded region name.
|
||||
regions.add(e.getKey());
|
||||
}
|
||||
}
|
||||
return regions == null ? null : regions
|
||||
.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns sorted set of edit files made by wal-log splitter, excluding files
|
||||
* with '.temp' suffix.
|
||||
|
|
|
@ -244,7 +244,12 @@ public class SequenceFileLogWriter implements HLog.Writer {
|
|||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
this.writer.syncFs();
|
||||
try {
|
||||
this.writer.syncFs();
|
||||
} catch (NullPointerException npe) {
|
||||
// Concurrent close...
|
||||
throw new IOException(npe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -323,11 +323,11 @@ public class TestHLog {
|
|||
regionsToSeqids.put(l.toString().getBytes(), l);
|
||||
}
|
||||
byte [][] regions =
|
||||
HLogUtil.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
|
||||
FSHLog.findMemstoresWithEditsEqualOrOlderThan(1, regionsToSeqids);
|
||||
assertEquals(2, regions.length);
|
||||
assertTrue(Bytes.equals(regions[0], "0".getBytes()) ||
|
||||
Bytes.equals(regions[0], "1".getBytes()));
|
||||
regions = HLogUtil.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
|
||||
regions = FSHLog.findMemstoresWithEditsEqualOrOlderThan(3, regionsToSeqids);
|
||||
int count = 4;
|
||||
assertEquals(count, regions.length);
|
||||
// Regions returned are not ordered.
|
||||
|
@ -518,9 +518,8 @@ public class TestHLog {
|
|||
htd.addFamily(new HColumnDescriptor("column"));
|
||||
|
||||
log.append(info, tableName, cols, System.currentTimeMillis(), htd);
|
||||
long logSeqId = log.startCacheFlush(info.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(info.getEncodedNameAsBytes(), tableName, logSeqId,
|
||||
info.isMetaRegion());
|
||||
log.startCacheFlush(info.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(info.getEncodedNameAsBytes());
|
||||
log.close();
|
||||
Path filename = ((FSHLog) log).computeFilename();
|
||||
log = null;
|
||||
|
@ -540,20 +539,6 @@ public class TestHLog {
|
|||
assertEquals((byte)(i + '0'), kv.getValue()[0]);
|
||||
System.out.println(key + " " + val);
|
||||
}
|
||||
HLog.Entry entry = null;
|
||||
while ((entry = reader.next(null)) != null) {
|
||||
HLogKey key = entry.getKey();
|
||||
WALEdit val = entry.getEdit();
|
||||
// Assert only one more row... the meta flushed row.
|
||||
assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName()));
|
||||
assertTrue(Bytes.equals(tableName, key.getTablename()));
|
||||
KeyValue kv = val.getKeyValues().get(0);
|
||||
assertTrue(Bytes.equals(HLog.METAROW, kv.getRow()));
|
||||
assertTrue(Bytes.equals(HLog.METAFAMILY, kv.getFamily()));
|
||||
assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
|
||||
val.getKeyValues().get(0).getValue()));
|
||||
System.out.println(key + " " + val);
|
||||
}
|
||||
} finally {
|
||||
if (log != null) {
|
||||
log.closeAndDelete();
|
||||
|
@ -589,8 +574,8 @@ public class TestHLog {
|
|||
HTableDescriptor htd = new HTableDescriptor();
|
||||
htd.addFamily(new HColumnDescriptor("column"));
|
||||
log.append(hri, tableName, cols, System.currentTimeMillis(), htd);
|
||||
long logSeqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, logSeqId, false);
|
||||
log.startCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.close();
|
||||
Path filename = ((FSHLog) log).computeFilename();
|
||||
log = null;
|
||||
|
@ -608,20 +593,6 @@ public class TestHLog {
|
|||
System.out.println(entry.getKey() + " " + val);
|
||||
idx++;
|
||||
}
|
||||
|
||||
// Get next row... the meta flushed row.
|
||||
entry = reader.next();
|
||||
assertEquals(1, entry.getEdit().size());
|
||||
for (KeyValue val : entry.getEdit().getKeyValues()) {
|
||||
assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(),
|
||||
entry.getKey().getEncodedRegionName()));
|
||||
assertTrue(Bytes.equals(tableName, entry.getKey().getTablename()));
|
||||
assertTrue(Bytes.equals(HLog.METAROW, val.getRow()));
|
||||
assertTrue(Bytes.equals(HLog.METAFAMILY, val.getFamily()));
|
||||
assertEquals(0, Bytes.compareTo(HLogUtil.COMPLETE_CACHE_FLUSH,
|
||||
val.getValue()));
|
||||
System.out.println(entry.getKey() + " " + val);
|
||||
}
|
||||
} finally {
|
||||
if (log != null) {
|
||||
log.closeAndDelete();
|
||||
|
@ -705,17 +676,19 @@ public class TestHLog {
|
|||
assertEquals(3, ((FSHLog) log).getNumLogFiles());
|
||||
|
||||
// Flush the first region, we expect to see the first two files getting
|
||||
// archived
|
||||
long seqId = log.startCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes(), tableName, seqId, false);
|
||||
// archived. We need to append something or writer won't be rolled.
|
||||
addEdits(log, hri2, tableName2, 1);
|
||||
log.startCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(hri.getEncodedNameAsBytes());
|
||||
log.rollWriter();
|
||||
assertEquals(2, ((FSHLog) log).getNumLogFiles());
|
||||
|
||||
// Flush the second region, which removes all the remaining output files
|
||||
// since the oldest was completely flushed and the two others only contain
|
||||
// flush information
|
||||
seqId = log.startCacheFlush(hri2.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(hri2.getEncodedNameAsBytes(), tableName2, seqId, false);
|
||||
addEdits(log, hri2, tableName2, 1);
|
||||
log.startCacheFlush(hri2.getEncodedNameAsBytes());
|
||||
log.completeCacheFlush(hri2.getEncodedNameAsBytes());
|
||||
log.rollWriter();
|
||||
assertEquals(0, ((FSHLog) log).getNumLogFiles());
|
||||
} finally {
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TestWALActionsListener {
|
|||
assertEquals(11, observer.postLogRollCounter);
|
||||
assertEquals(5, laterobserver.preLogRollCounter);
|
||||
assertEquals(5, laterobserver.postLogRollCounter);
|
||||
assertEquals(2, observer.closedCount);
|
||||
assertEquals(1, observer.closedCount);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -557,8 +557,8 @@ public class TestWALReplay {
|
|||
}
|
||||
|
||||
// Add a cache flush, shouldn't have any effect
|
||||
long logSeqId = wal.startCacheFlush(regionName);
|
||||
wal.completeCacheFlush(regionName, tableName, logSeqId, hri.isMetaRegion());
|
||||
wal.startCacheFlush(regionName);
|
||||
wal.completeCacheFlush(regionName);
|
||||
|
||||
// Add an edit to another family, should be skipped.
|
||||
WALEdit edit = new WALEdit();
|
||||
|
@ -661,7 +661,7 @@ public class TestWALReplay {
|
|||
wal.doCompleteCacheFlush = true;
|
||||
// allow complete cache flush with the previous seq number got after first
|
||||
// set of edits.
|
||||
wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
|
||||
wal.completeCacheFlush(hri.getEncodedNameAsBytes());
|
||||
wal.close();
|
||||
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
|
||||
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
|
||||
|
@ -686,12 +686,11 @@ public class TestWALReplay {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void completeCacheFlush(byte[] encodedRegionName, byte[] tableName, long logSeqId,
|
||||
boolean isMetaRegion) throws IOException {
|
||||
public void completeCacheFlush(byte[] encodedRegionName) {
|
||||
if (!doCompleteCacheFlush) {
|
||||
return;
|
||||
}
|
||||
super.completeCacheFlush(encodedRegionName, tableName, logSeqId, isMetaRegion);
|
||||
super.completeCacheFlush(encodedRegionName);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue