From 9ebafe6b08eb2ec8fbb1974976ef5afd9e6f9c09 Mon Sep 17 00:00:00 2001 From: tedyu Date: Thu, 16 Jul 2015 13:53:22 -0700 Subject: [PATCH] HBASE-13971 Flushes stuck since 6 hours on a regionserver --- .../hadoop/hbase/regionserver/HRegion.java | 12 ++++++++++-- .../org/apache/hadoop/hbase/wal/WALKey.java | 19 ++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2fd7ebd6a74..d4e89e05961 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -200,6 +200,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; + + // in milliseconds + private static final String MAX_WAIT_FOR_SEQ_ID_KEY = + "hbase.hregion.max.wait.for.seq.id"; + + private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 60000; /** * This is the global default value for durability. All tables/mutations not @@ -331,6 +337,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean isLoadingCfsOnDemandDefault = false; + private int maxWaitForSeqId; private final AtomicInteger majorInProgress = new AtomicInteger(0); private final AtomicInteger minorInProgress = new AtomicInteger(0); @@ -663,6 +670,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); + maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; this.rsServices = rsServices; @@ -2415,7 +2423,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { WALKey key = this.appendEmptyEdit(wal, null); - return key.getSequenceId(); + return key.getSequenceId(maxWaitForSeqId); } ////////////////////////////////////////////////////////////////////////////// @@ -7215,7 +7223,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index e8056e4480a..69c2aec5b7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -30,6 +30,7 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.util.ByteStringer; @@ -301,8 +302,24 @@ public class WALKey implements SequenceId, Comparable { */ @Override public long getSequenceId() throws IOException { + return getSequenceId(-1); + } + + /** + * Wait for sequence number is assigned & return the assigned value + * @param maxWaitForSeqId maximum duration, in milliseconds, to wait for seq number to be assigned + * @return long the new assigned sequence number + * @throws IOException + */ + public long getSequenceId(int maxWaitForSeqId) throws IOException { try { - this.seqNumAssignedLatch.await(); + if (maxWaitForSeqId < 0) { + this.seqNumAssignedLatch.await(); + } else { + if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { + throw new IOException("Timed out waiting for seq number to be assigned"); + } + } } catch (InterruptedException ie) { LOG.warn("Thread interrupted waiting for next log sequence number"); InterruptedIOException iie = new InterruptedIOException();