HDFS-15457. TestFsDatasetImpl fails intermittently (#2407)

This commit is contained in:
Ahmed Hussein 2020-10-27 20:52:56 -05:00 committed by Takanobu Asanuma
parent 340dee4469
commit 25bcc7e3ee
1 changed files with 45 additions and 27 deletions

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Supplier; import java.util.function.Supplier;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
@ -201,9 +203,9 @@ public class TestFsDatasetImpl {
assertEquals(0, dataset.getNumFailedVolumes()); assertEquals(0, dataset.getNumFailedVolumes());
} }
@Test @Test(timeout=10000)
public void testReadLockEnabledByDefault() public void testReadLockEnabledByDefault()
throws IOException, InterruptedException { throws Exception {
final FsDatasetSpi ds = dataset; final FsDatasetSpi ds = dataset;
AtomicBoolean accessed = new AtomicBoolean(false); AtomicBoolean accessed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
@ -213,7 +215,8 @@ public class TestFsDatasetImpl {
public void run() { public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown(); latch.countDown();
sleep(10000); // wait for the waiter thread to access the lock.
waiterLatch.await();
} catch (Exception e) { } catch (Exception e) {
} }
} }
@ -221,29 +224,33 @@ public class TestFsDatasetImpl {
Thread waiter = new Thread() { Thread waiter = new Thread() {
public void run() { public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { try {
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown(); waiterLatch.countDown();
return;
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true); accessed.getAndSet(true);
// signal the holder thread.
waiterLatch.countDown();
} catch (Exception e) { } catch (Exception e) {
} }
} }
}; };
holder.start();
latch.await();
waiter.start(); waiter.start();
waiterLatch.await(); holder.start();
holder.join();
waiter.join();
// The holder thread is still holding the lock, but the waiter can still // The holder thread is still holding the lock, but the waiter can still
// run as the lock is a shared read lock. // run as the lock is a shared read lock.
assertEquals(true, accessed.get()); assertEquals(true, accessed.get());
holder.interrupt(); holder.interrupt();
holder.join();
waiter.join();
} }
@Test(timeout=10000) @Test(timeout=10000)
public void testReadLockCanBeDisabledByConfig() public void testReadLockCanBeDisabledByConfig()
throws IOException, InterruptedException { throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean( conf.setBoolean(
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
@ -256,41 +263,52 @@ public class TestFsDatasetImpl {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waiterLatch = new CountDownLatch(1); CountDownLatch waiterLatch = new CountDownLatch(1);
AtomicBoolean accessed = new AtomicBoolean(false); // create a synchronized list and verify the order of elements.
List<Integer> syncList =
Collections.synchronizedList(new ArrayList<>());
Thread holder = new Thread() { Thread holder = new Thread() {
public void run() { public void run() {
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
latch.countDown(); latch.countDown();
sleep(10000); try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
syncList.add(0);
} catch (Exception e) { } catch (Exception e) {
return;
}
try {
waiterLatch.await();
syncList.add(2);
} catch (InterruptedException e) {
} }
} }
}; };
Thread waiter = new Thread() { Thread waiter = new Thread() {
public void run() { public void run() {
try {
// wait for holder to get into the critical section.
latch.await();
} catch (InterruptedException e) {
waiterLatch.countDown();
}
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
accessed.getAndSet(true); syncList.add(1);
waiterLatch.countDown(); waiterLatch.countDown();
} catch (Exception e) { } catch (Exception e) {
} }
} }
}; };
holder.start();
latch.await();
waiter.start(); waiter.start();
Thread.sleep(200); holder.start();
// Waiting thread should not have been able to update the variable
// as the read lock is disabled and hence an exclusive lock.
assertEquals(false, accessed.get());
holder.interrupt();
holder.join();
waiterLatch.await();
// After the holder thread exits, the variable is updated.
assertEquals(true, accessed.get());
waiter.join(); waiter.join();
holder.join();
// verify that the synchronized list has the correct sequence.
assertEquals(
"The sequence of checkpoints does not correspond to shared lock",
syncList, Arrays.asList(0, 1, 2));
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }