HBASE-5623 Race condition when rolling the HLog and hlogFlush (Enis Soztutar and LarsH)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1305556 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-03-26 20:42:46 +00:00
parent 8ea1c8ddd6
commit 080206de01
3 changed files with 173 additions and 17 deletions

View File

@ -143,7 +143,7 @@ public class HLog implements Syncable {
private volatile long syncedTillHere = 0;
private long lastDeferredTxid;
private final Path oldLogDir;
private boolean logRollRunning;
private volatile boolean logRollRunning;
private static Class<? extends Writer> logWriterClass;
private static Class<? extends Reader> logReaderClass;
@ -1227,10 +1227,8 @@ public class HLog implements Syncable {
}
// writes out pending entries to the HLog
void hlogFlush(Writer writer) throws IOException {
// Atomically fetch all existing pending writes. New writes
// will start accumulating in a new list.
List<Entry> pending = getPendingWrites();
void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
if (pending == null) return;
// write out all accumulated Entries to hdfs.
for (Entry e : pending) {
@ -1250,8 +1248,10 @@ public class HLog implements Syncable {
// sync all transactions upto the specified txid
private void syncer(long txid) throws IOException {
Writer tempWriter;
synchronized (this.updateLock) {
if (this.closed) return;
tempWriter = this.writer; // guaranteed non-null
}
// if the transaction that we are interested in is already
// synced, then return immediately.
@ -1262,23 +1262,23 @@ public class HLog implements Syncable {
long doneUpto = this.unflushedEntries.get();
long now = System.currentTimeMillis();
// Done in parallel for all writer threads, thanks to HDFS-895
boolean syncSuccessful = true;
List<Entry> pending = logSyncerThread.getPendingWrites();
try {
// First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
logSyncerThread.hlogFlush(this.writer);
this.writer.sync();
logSyncerThread.hlogFlush(tempWriter, pending);
pending = null;
tempWriter.sync();
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
} catch(IOException io) {
syncSuccessful = false;
}
if (!syncSuccessful) {
synchronized (this.updateLock) {
// HBASE-4387, retry with updateLock held
this.writer.sync();
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
logSyncerThread.hlogFlush(tempWriter, pending);
tempWriter.sync();
syncBatchSize.addAndGet(doneUpto - this.syncedTillHere);
this.syncedTillHere = doneUpto;
}
@ -1288,8 +1288,12 @@ public class HLog implements Syncable {
syncTime.inc(System.currentTimeMillis() - now);
if (!this.logRollRunning) {
checkLowReplication();
if (this.writer.getLength() > this.logrollsize) {
requestLogRoll();
try {
if (tempWriter.getLength() > this.logrollsize) {
requestLogRoll();
}
} catch (IOException x) {
LOG.debug("Log roll failed and will be retried. (This is not an error)");
}
}
} catch (IOException e) {

View File

@ -276,7 +276,12 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override
public void append(HLog.Entry entry) throws IOException {
entry.setCompressionContext(compressionContext);
this.writer.append(entry.getKey(), entry.getEdit());
try {
this.writer.append(entry.getKey(), entry.getEdit());
} catch (NullPointerException npe) {
// Concurrent close...
throw new IOException(npe);
}
}
@Override
@ -311,7 +316,12 @@ public class SequenceFileLogWriter implements HLog.Writer {
@Override
public long getLength() throws IOException {
return this.writer.getLength();
try {
return this.writer.getLength();
} catch (NullPointerException npe) {
// Concurrent close...
throw new IOException(npe);
}
}
/**

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Test many concurrent appenders to an {@link #HLog} while rolling the log.
*/
@Category(MediumTests.class)
public class TestLogRollingNoCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final static byte [] EMPTY_1K_ARRAY = new byte[1024];
private static final int THREAD_COUNT = 100; // Spin up this many threads
/**
* Spin up a bunch of threads and have them all append to a WAL. Roll the
* WAL frequently to try and trigger NPE.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testContendedLogRolling() throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
Path dir = TEST_UTIL.getDataTestDir();
HLog wal = new HLog(fs, new Path(dir, "logs"), new Path(dir, "oldlogs"),
TEST_UTIL.getConfiguration());
Appender [] appenders = null;
final int count = THREAD_COUNT;
appenders = new Appender[count];
try {
for (int i = 0; i < count; i++) {
// Have each appending thread write 'count' entries
appenders[i] = new Appender(wal, i, count);
}
for (int i = 0; i < count; i++) {
appenders[i].start();
}
for (int i = 0; i < count; i++) {
//ensure that all threads are joined before closing the wal
appenders[i].join();
}
} finally {
wal.close();
}
for (int i = 0; i < count; i++) {
assertFalse(appenders[i].isException());
}
}
/**
* Appender thread. Appends to passed wal file.
*/
static class Appender extends Thread {
private final Log log;
private final HLog wal;
private final int count;
private Exception e = null;
Appender(final HLog wal, final int index, final int count) {
super("" + index);
this.wal = wal;
this.count = count;
this.log = LogFactory.getLog("Appender:" + getName());
}
/**
* @return Call when the thread is done.
*/
boolean isException() {
return !isAlive() && this.e != null;
}
Exception getException() {
return this.e;
}
@Override
public void run() {
this.log.info(getName() +" started");
try {
for (int i = 0; i < this.count; i++) {
long now = System.currentTimeMillis();
// Roll every ten edits if the log has anything in it.
if (i % 10 == 0 && this.wal.getNumEntries() > 0) {
this.wal.rollWriter();
}
WALEdit edit = new WALEdit();
byte[] bytes = Bytes.toBytes(i);
edit.add(new KeyValue(bytes, bytes, bytes, now, EMPTY_1K_ARRAY));
this.wal.append(HRegionInfo.FIRST_META_REGIONINFO,
HTableDescriptor.META_TABLEDESC.getName(),
edit, now, HTableDescriptor.META_TABLEDESC);
}
String msg = getName() + " finished";
if (isException())
this.log.info(msg, getException());
else
this.log.info(msg);
} catch (Exception e) {
this.e = e;
log.info("Caught exception from Appender:" + getName(), e);
}
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}