From 0879f4ec0f71887a6b8e7a26df54ecb9533c4d67 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein <50450311+amahussein@users.noreply.github.com> Date: Tue, 27 Oct 2020 20:52:56 -0500 Subject: [PATCH] HDFS-15457. TestFsDatasetImpl fails intermittently (#2407) (cherry picked from commit 98097b8f19789605b9697f6a959da57261e0fe19) --- .../fsdataset/impl/TestFsDatasetImpl.java | 72 ++++++++++++------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index d4c4b14ac9b..5055c669bb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.util.Arrays; +import java.util.Collections; import java.util.function.Supplier; import com.google.common.collect.Lists; @@ -197,9 +199,9 @@ public class TestFsDatasetImpl { assertEquals(0, dataset.getNumFailedVolumes()); } - @Test + @Test(timeout=10000) public void testReadLockEnabledByDefault() - throws IOException, InterruptedException { + throws Exception { final FsDatasetSpi ds = dataset; AtomicBoolean accessed = new AtomicBoolean(false); CountDownLatch latch = new CountDownLatch(1); @@ -209,7 +211,8 @@ public class TestFsDatasetImpl { public void run() { try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { latch.countDown(); - sleep(10000); + // wait for the waiter thread to access the lock. + waiterLatch.await(); } catch (Exception e) { } } @@ -217,29 +220,33 @@ public class TestFsDatasetImpl { Thread waiter = new Thread() { public void run() { - try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { + try { + latch.await(); + } catch (InterruptedException e) { waiterLatch.countDown(); + return; + } + try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { accessed.getAndSet(true); + // signal the holder thread. + waiterLatch.countDown(); } catch (Exception e) { } } }; - - holder.start(); - latch.await(); waiter.start(); - waiterLatch.await(); + holder.start(); + holder.join(); + waiter.join(); // The holder thread is still holding the lock, but the waiter can still // run as the lock is a shared read lock. assertEquals(true, accessed.get()); holder.interrupt(); - holder.join(); - waiter.join(); } @Test(timeout=10000) public void testReadLockCanBeDisabledByConfig() - throws IOException, InterruptedException { + throws Exception { HdfsConfiguration conf = new HdfsConfiguration(); conf.setBoolean( DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false); @@ -252,41 +259,52 @@ public class TestFsDatasetImpl { CountDownLatch latch = new CountDownLatch(1); CountDownLatch waiterLatch = new CountDownLatch(1); - AtomicBoolean accessed = new AtomicBoolean(false); + // create a synchronized list and verify the order of elements. + List syncList = + Collections.synchronizedList(new ArrayList<>()); + Thread holder = new Thread() { public void run() { + latch.countDown(); try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - latch.countDown(); - sleep(10000); + syncList.add(0); } catch (Exception e) { + return; + } + try { + waiterLatch.await(); + syncList.add(2); + } catch (InterruptedException e) { } } }; Thread waiter = new Thread() { public void run() { + try { + // wait for holder to get into the critical section. + latch.await(); + } catch (InterruptedException e) { + waiterLatch.countDown(); + } try (AutoCloseableLock l = ds.acquireDatasetReadLock()) { - accessed.getAndSet(true); + syncList.add(1); waiterLatch.countDown(); } catch (Exception e) { } } }; - - holder.start(); - latch.await(); waiter.start(); - Thread.sleep(200); - // Waiting thread should not have been able to update the variable - // as the read lock is disabled and hence an exclusive lock. - assertEquals(false, accessed.get()); - holder.interrupt(); - holder.join(); - waiterLatch.await(); - // After the holder thread exits, the variable is updated. - assertEquals(true, accessed.get()); + holder.start(); + waiter.join(); + holder.join(); + + // verify that the synchronized list has the correct sequence. + assertEquals( + "The sequence of checkpoints does not correspond to shared lock", + syncList, Arrays.asList(0, 1, 2)); } finally { cluster.shutdown(); }