HBASE-27384 Backport HBASE-27064 (CME in TestRegionNormalizerWorkQueue) to 2.4 (#4794) (#4468)

Co-authored-by: Andrew Purtell <apurtell@apache.org>

Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Aman Poonia 2022-09-27 11:29:58 +05:30 committed by GitHub
parent e7b67a19b3
commit f24e7b55f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 32 additions and 75 deletions

View File

@ -25,7 +25,7 @@ import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
/** /**
@ -39,8 +39,6 @@ import org.apache.yetus.audience.InterfaceAudience;
* {@link BlockingQueue}.</li> * {@link BlockingQueue}.</li>
* <li>Allows a producer to insert an item at the head of the queue, if desired.</li> * <li>Allows a producer to insert an item at the head of the queue, if desired.</li>
* </ul> * </ul>
* Assumes low-frequency and low-parallelism concurrent access, so protects state using a simplistic
* synchronization strategy.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class RegionNormalizerWorkQueue<E> { class RegionNormalizerWorkQueue<E> {
@ -48,53 +46,15 @@ class RegionNormalizerWorkQueue<E> {
/** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */ /** Underlying storage structure that gives us the Set behavior and FIFO retrieval policy. */
private LinkedHashSet<E> delegate; private LinkedHashSet<E> delegate;
// the locking structure used here follows the example found in LinkedBlockingQueue. The /** Lock for puts and takes **/
// difference is that our locks guard access to `delegate` rather than the head node. private final ReentrantReadWriteLock lock;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock;
/** Wait queue for waiting takes */ /** Wait queue for waiting takes */
private final Condition notEmpty; private final Condition notEmpty;
/** Lock held by put, offer, etc */
private final ReentrantLock putLock;
RegionNormalizerWorkQueue() { RegionNormalizerWorkQueue() {
delegate = new LinkedHashSet<>(); delegate = new LinkedHashSet<>();
takeLock = new ReentrantLock(); lock = new ReentrantReadWriteLock();
notEmpty = takeLock.newCondition(); notEmpty = lock.writeLock().newCondition();
putLock = new ReentrantLock();
}
/**
* Signals a waiting take. Called only from put/offer (which do not otherwise ordinarily lock
* takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* Locks to prevent both puts and takes.
*/
private void fullyLock() {
putLock.lock();
takeLock.lock();
}
/**
* Unlocks to allow both puts and takes.
*/
private void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
} }
/** /**
@ -105,16 +65,14 @@ class RegionNormalizerWorkQueue<E> {
if (e == null) { if (e == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
lock.writeLock().lock();
putLock.lock();
try { try {
delegate.add(e); delegate.add(e);
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally { } finally {
putLock.unlock(); lock.writeLock().unlock();
}
if (!delegate.isEmpty()) {
signalNotEmpty();
} }
} }
@ -138,16 +96,14 @@ class RegionNormalizerWorkQueue<E> {
if (c == null) { if (c == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
lock.writeLock().lock();
putLock.lock();
try { try {
delegate.addAll(c); delegate.addAll(c);
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally { } finally {
putLock.unlock(); lock.writeLock().unlock();
}
if (!delegate.isEmpty()) {
signalNotEmpty();
} }
} }
@ -159,19 +115,17 @@ class RegionNormalizerWorkQueue<E> {
if (c == null) { if (c == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
lock.writeLock().lock();
fullyLock();
try { try {
final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size()); final LinkedHashSet<E> copy = new LinkedHashSet<>(c.size() + delegate.size());
copy.addAll(c); copy.addAll(c);
copy.addAll(delegate); copy.addAll(delegate);
delegate = copy; delegate = copy;
if (!delegate.isEmpty()) {
notEmpty.signal();
}
} finally { } finally {
fullyUnlock(); lock.writeLock().unlock();
}
if (!delegate.isEmpty()) {
signalNotEmpty();
} }
} }
@ -183,10 +137,13 @@ class RegionNormalizerWorkQueue<E> {
*/ */
public E take() throws InterruptedException { public E take() throws InterruptedException {
E x; E x;
takeLock.lockInterruptibly(); // Take a write lock. If the delegate's queue is empty we need it to await(), which will
// drop the lock, then reacquire it; or if the queue is not empty we will use an iterator
// to mutate the head.
lock.writeLock().lockInterruptibly();
try { try {
while (delegate.isEmpty()) { while (delegate.isEmpty()) {
notEmpty.await(); notEmpty.await(); // await drops the lock, then reacquires it
} }
final Iterator<E> iter = delegate.iterator(); final Iterator<E> iter = delegate.iterator();
x = iter.next(); x = iter.next();
@ -195,7 +152,7 @@ class RegionNormalizerWorkQueue<E> {
notEmpty.signal(); notEmpty.signal();
} }
} finally { } finally {
takeLock.unlock(); lock.writeLock().unlock();
} }
return x; return x;
} }
@ -205,11 +162,11 @@ class RegionNormalizerWorkQueue<E> {
* returns. * returns.
*/ */
public void clear() { public void clear() {
putLock.lock(); lock.writeLock().lock();
try { try {
delegate.clear(); delegate.clear();
} finally { } finally {
putLock.unlock(); lock.writeLock().unlock();
} }
} }
@ -218,21 +175,21 @@ class RegionNormalizerWorkQueue<E> {
* @return the number of elements in this queue * @return the number of elements in this queue
*/ */
public int size() { public int size() {
takeLock.lock(); lock.readLock().lock();
try { try {
return delegate.size(); return delegate.size();
} finally { } finally {
takeLock.unlock(); lock.readLock().unlock();
} }
} }
@Override @Override
public String toString() { public String toString() {
takeLock.lock(); lock.readLock().lock();
try { try {
return delegate.toString(); return delegate.toString();
} finally { } finally {
takeLock.unlock(); lock.readLock().unlock();
} }
} }
} }