HDFS-15457. TestFsDatasetImpl fails intermittently (#2407)
(cherry picked from commit 98097b8f19
)
This commit is contained in:
parent
84e16adab3
commit
0879f4ec0f
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.function.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
|
@ -197,9 +199,9 @@ public class TestFsDatasetImpl {
|
|||
assertEquals(0, dataset.getNumFailedVolumes());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout=10000)
|
||||
public void testReadLockEnabledByDefault()
|
||||
throws IOException, InterruptedException {
|
||||
throws Exception {
|
||||
final FsDatasetSpi ds = dataset;
|
||||
AtomicBoolean accessed = new AtomicBoolean(false);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
@ -209,7 +211,8 @@ public class TestFsDatasetImpl {
|
|||
public void run() {
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
latch.countDown();
|
||||
sleep(10000);
|
||||
// wait for the waiter thread to access the lock.
|
||||
waiterLatch.await();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
|
@ -217,29 +220,33 @@ public class TestFsDatasetImpl {
|
|||
|
||||
Thread waiter = new Thread() {
|
||||
public void run() {
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
waiterLatch.countDown();
|
||||
return;
|
||||
}
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
accessed.getAndSet(true);
|
||||
// signal the holder thread.
|
||||
waiterLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
holder.start();
|
||||
latch.await();
|
||||
waiter.start();
|
||||
waiterLatch.await();
|
||||
holder.start();
|
||||
holder.join();
|
||||
waiter.join();
|
||||
// The holder thread is still holding the lock, but the waiter can still
|
||||
// run as the lock is a shared read lock.
|
||||
assertEquals(true, accessed.get());
|
||||
holder.interrupt();
|
||||
holder.join();
|
||||
waiter.join();
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testReadLockCanBeDisabledByConfig()
|
||||
throws IOException, InterruptedException {
|
||||
throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
conf.setBoolean(
|
||||
DFSConfigKeys.DFS_DATANODE_LOCK_READ_WRITE_ENABLED_KEY, false);
|
||||
|
@ -252,41 +259,52 @@ public class TestFsDatasetImpl {
|
|||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
CountDownLatch waiterLatch = new CountDownLatch(1);
|
||||
AtomicBoolean accessed = new AtomicBoolean(false);
|
||||
// create a synchronized list and verify the order of elements.
|
||||
List<Integer> syncList =
|
||||
Collections.synchronizedList(new ArrayList<>());
|
||||
|
||||
|
||||
Thread holder = new Thread() {
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
latch.countDown();
|
||||
sleep(10000);
|
||||
syncList.add(0);
|
||||
} catch (Exception e) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
waiterLatch.await();
|
||||
syncList.add(2);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Thread waiter = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
// wait for holder to get into the critical section.
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
waiterLatch.countDown();
|
||||
}
|
||||
try (AutoCloseableLock l = ds.acquireDatasetReadLock()) {
|
||||
accessed.getAndSet(true);
|
||||
syncList.add(1);
|
||||
waiterLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
holder.start();
|
||||
latch.await();
|
||||
waiter.start();
|
||||
Thread.sleep(200);
|
||||
// Waiting thread should not have been able to update the variable
|
||||
// as the read lock is disabled and hence an exclusive lock.
|
||||
assertEquals(false, accessed.get());
|
||||
holder.interrupt();
|
||||
holder.join();
|
||||
waiterLatch.await();
|
||||
// After the holder thread exits, the variable is updated.
|
||||
assertEquals(true, accessed.get());
|
||||
holder.start();
|
||||
|
||||
waiter.join();
|
||||
holder.join();
|
||||
|
||||
// verify that the synchronized list has the correct sequence.
|
||||
assertEquals(
|
||||
"The sequence of checkpoints does not correspond to shared lock",
|
||||
syncList, Arrays.asList(0, 1, 2));
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue