HBASE-16429 FSHLog: deadlock if rollWriter called when ring buffer filled with appends

This commit is contained in:
Yu Li 2016-08-18 09:59:36 +08:00
parent e60a4302fd
commit 642d2fe0f8
2 changed files with 98 additions and 5 deletions

View File

@ -324,8 +324,20 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// constructor BEFORE the ring buffer is set running so it is null on first time through // constructor BEFORE the ring buffer is set running so it is null on first time through
// here; allow for that. // here; allow for that.
SyncFuture syncFuture = null; SyncFuture syncFuture = null;
SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null) ? null SafePointZigZagLatch zigzagLatch = null;
: this.ringBufferEventHandler.attainSafePoint(); long sequence = -1L;
if (this.ringBufferEventHandler != null) {
// Get sequence first to avoid dead lock when ring buffer is full
// Considering below sequence
// 1. replaceWriter is called and zigzagLatch is initialized
// 2. ringBufferEventHandler#onEvent is called and arrives at #attainSafePoint(long) then wait
// on safePointReleasedLatch
// 3. Since ring buffer is full, if we get sequence when publish sync, the replaceWriter
// thread will wait for the ring buffer to be consumed, but the only consumer is waiting
// replaceWriter thread to release safePointReleasedLatch, which causes a deadlock
sequence = getSequenceOnRingBuffer();
zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
}
afterCreatingZigZagLatch(); afterCreatingZigZagLatch();
long oldFileLen = 0L; long oldFileLen = 0L;
try { try {
@ -336,8 +348,11 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// to come back. Cleanup this syncFuture down below after we are ready to run again. // to come back. Cleanup this syncFuture down below after we are ready to run again.
try { try {
if (zigzagLatch != null) { if (zigzagLatch != null) {
// use assert to make sure no change breaks the logic that
// sequence and zigzagLatch will be set together
assert sequence > 0L : "Failed to get sequence from ring buffer";
Trace.addTimelineAnnotation("awaiting safepoint"); Trace.addTimelineAnnotation("awaiting safepoint");
syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence));
} }
} catch (FailedSyncBeforeLogCloseException e) { } catch (FailedSyncBeforeLogCloseException e) {
// If unflushed/unsynced entries on close, it is reason to abort. // If unflushed/unsynced entries on close, it is reason to abort.
@ -709,12 +724,20 @@ public class FSHLog extends AbstractFSWAL<Writer> {
return logRollNeeded; return logRollNeeded;
} }
private SyncFuture publishSyncOnRingBuffer() { private SyncFuture publishSyncOnRingBuffer(long sequence) {
return publishSyncOnRingBuffer(null); return publishSyncOnRingBuffer(sequence, null);
}
private long getSequenceOnRingBuffer() {
return this.disruptor.getRingBuffer().next();
} }
private SyncFuture publishSyncOnRingBuffer(Span span) { private SyncFuture publishSyncOnRingBuffer(Span span) {
long sequence = this.disruptor.getRingBuffer().next(); long sequence = this.disruptor.getRingBuffer().next();
return publishSyncOnRingBuffer(sequence, span);
}
private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) {
// here we use ring buffer sequence as transaction id // here we use ring buffer sequence as transaction id
SyncFuture syncFuture = getSyncFuture(sequence, span); SyncFuture syncFuture = getSyncFuture(sequence, span);
try { try {

View File

@ -6659,4 +6659,74 @@ public class TestHRegion {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
families); families);
} }
/**
* HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends
* @throws IOException if IO error occurred during test
*/
@Test(timeout = 60000)
public void testWritesWhileRollWriter() throws IOException {
int testCount = 10;
int numRows = 1024;
int numFamilies = 2;
int numQualifiers = 2;
final byte[][] families = new byte[numFamilies][];
for (int i = 0; i < numFamilies; i++) {
families[i] = Bytes.toBytes("family" + i);
}
final byte[][] qualifiers = new byte[numQualifiers][];
for (int i = 0; i < numQualifiers; i++) {
qualifiers[i] = Bytes.toBytes("qual" + i);
}
String method = "testWritesWhileRollWriter";
CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
this.region = initHRegion(tableName, method, CONF, families);
try {
List<Thread> threads = new ArrayList<Thread>();
for (int i = 0; i < numRows; i++) {
final int count = i;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
byte[] row = Bytes.toBytes("row" + count);
Put put = new Put(row);
put.setDurability(Durability.SYNC_WAL);
byte[] value = Bytes.toBytes(String.valueOf(count));
for (byte[] family : families) {
for (byte[] qualifier : qualifiers) {
put.addColumn(family, qualifier, (long) count, value);
}
}
try {
region.put(put);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
threads.add(t);
}
for (Thread t : threads) {
t.start();
}
for (int i = 0; i < testCount; i++) {
region.getWAL().rollWriter();
Thread.yield();
}
} finally {
try {
HBaseTestingUtility.closeRegionAndWAL(this.region);
CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
} catch (DroppedSnapshotException dse) {
// We could get this on way out because we interrupt the background flusher and it could
// fail anywhere causing a DSE over in the background flusher... only it is not properly
// dealt with so could still be memory hanging out when we get to here -- memory we can't
// flush because the accounting is 'off' since original DSE.
}
this.region = null;
}
}
} }