From 642d2fe0f8d82b8ed5b9738baca0c1e2551272d4 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Thu, 18 Aug 2016 09:59:36 +0800 Subject: [PATCH] HBASE-16429 FSHLog: deadlock if rollWriter called when ring buffer filled with appends --- .../hadoop/hbase/regionserver/wal/FSHLog.java | 33 +++++++-- .../hbase/regionserver/TestHRegion.java | 70 +++++++++++++++++++ 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index cdf57572b3c..f93537d9d7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -324,8 +324,20 @@ public class FSHLog extends AbstractFSWAL { // constructor BEFORE the ring buffer is set running so it is null on first time through // here; allow for that. SyncFuture syncFuture = null; - SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null) ? null - : this.ringBufferEventHandler.attainSafePoint(); + SafePointZigZagLatch zigzagLatch = null; + long sequence = -1L; + if (this.ringBufferEventHandler != null) { + // Get sequence first to avoid dead lock when ring buffer is full + // Considering below sequence + // 1. replaceWriter is called and zigzagLatch is initialized + // 2. ringBufferEventHandler#onEvent is called and arrives at #attainSafePoint(long) then wait + // on safePointReleasedLatch + // 3. Since ring buffer is full, if we get sequence when publish sync, the replaceWriter + // thread will wait for the ring buffer to be consumed, but the only consumer is waiting + // replaceWriter thread to release safePointReleasedLatch, which causes a deadlock + sequence = getSequenceOnRingBuffer(); + zigzagLatch = this.ringBufferEventHandler.attainSafePoint(); + } afterCreatingZigZagLatch(); long oldFileLen = 0L; try { @@ -336,8 +348,11 @@ public class FSHLog extends AbstractFSWAL { // to come back. Cleanup this syncFuture down below after we are ready to run again. try { if (zigzagLatch != null) { + // use assert to make sure no change breaks the logic that + // sequence and zigzagLatch will be set together + assert sequence > 0L : "Failed to get sequence from ring buffer"; Trace.addTimelineAnnotation("awaiting safepoint"); - syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer()); + syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence)); } } catch (FailedSyncBeforeLogCloseException e) { // If unflushed/unsynced entries on close, it is reason to abort. @@ -709,12 +724,20 @@ public class FSHLog extends AbstractFSWAL { return logRollNeeded; } - private SyncFuture publishSyncOnRingBuffer() { - return publishSyncOnRingBuffer(null); + private SyncFuture publishSyncOnRingBuffer(long sequence) { + return publishSyncOnRingBuffer(sequence, null); + } + + private long getSequenceOnRingBuffer() { + return this.disruptor.getRingBuffer().next(); } private SyncFuture publishSyncOnRingBuffer(Span span) { long sequence = this.disruptor.getRingBuffer().next(); + return publishSyncOnRingBuffer(sequence, span); + } + + private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) { // here we use ring buffer sequence as transaction id SyncFuture syncFuture = getSyncFuture(sequence, span); try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 9aa3a9be618..765c9cff826 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -6659,4 +6659,74 @@ public class TestHRegion { return initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); } + + /** + * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled with appends + * @throws IOException if IO error occurred during test + */ + @Test(timeout = 60000) + public void testWritesWhileRollWriter() throws IOException { + int testCount = 10; + int numRows = 1024; + int numFamilies = 2; + int numQualifiers = 2; + final byte[][] families = new byte[numFamilies][]; + for (int i = 0; i < numFamilies; i++) { + families[i] = Bytes.toBytes("family" + i); + } + final byte[][] qualifiers = new byte[numQualifiers][]; + for (int i = 0; i < numQualifiers; i++) { + qualifiers[i] = Bytes.toBytes("qual" + i); + } + + String method = "testWritesWhileRollWriter"; + CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2); + this.region = initHRegion(tableName, method, CONF, families); + try { + List threads = new ArrayList(); + for (int i = 0; i < numRows; i++) { + final int count = i; + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + byte[] row = Bytes.toBytes("row" + count); + Put put = new Put(row); + put.setDurability(Durability.SYNC_WAL); + byte[] value = Bytes.toBytes(String.valueOf(count)); + for (byte[] family : families) { + for (byte[] qualifier : qualifiers) { + put.addColumn(family, qualifier, (long) count, value); + } + } + try { + region.put(put); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + threads.add(t); + } + for (Thread t : threads) { + t.start(); + } + + for (int i = 0; i < testCount; i++) { + region.getWAL().rollWriter(); + Thread.yield(); + } + } finally { + try { + HBaseTestingUtility.closeRegionAndWAL(this.region); + CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024); + } catch (DroppedSnapshotException dse) { + // We could get this on way out because we interrupt the background flusher and it could + // fail anywhere causing a DSE over in the background flusher... only it is not properly + // dealt with so could still be memory hanging out when we get to here -- memory we can't + // flush because the accounting is 'off' since original DSE. + } + this.region = null; + } + } }