HDFS-14986. ReplicaCachingGetSpaceUsed throws ConcurrentModificationException. Contributed by Aiphago.

This commit is contained in:
Yiqun Lin 2019-11-28 10:43:35 +08:00
parent 82ad9b549f
commit 2b452b4e60
5 changed files with 98 additions and 10 deletions

View File

@ -47,6 +47,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
private final long jitter;
private final String dirPath;
private Thread refreshUsed;
private boolean shouldFirstRefresh;
/**
* This is the constructor used by the builder.
@ -79,16 +80,30 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
this.refreshInterval = interval;
this.jitter = jitter;
this.used.set(initialUsed);
this.shouldFirstRefresh = true;
}
void init() {
if (used.get() < 0) {
used.set(0);
if (!shouldFirstRefresh) {
// Skip initial refresh operation, so we need to do first refresh
// operation immediately in refresh thread.
initRefeshThread(true);
return;
}
refresh();
}
initRefeshThread(false);
}
/**
* RunImmediately should set true, if we skip the first refresh.
* @param runImmediately The param default should be false.
*/
private void initRefeshThread (boolean runImmediately) {
if (refreshInterval > 0) {
refreshUsed = new Thread(new RefreshThread(this),
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
@ -100,6 +115,14 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
protected abstract void refresh();
/**
* Reset that if we need to do the first refresh.
* @param shouldFirstRefresh The flag value to set.
*/
protected void setShouldFirstRefresh(boolean shouldFirstRefresh) {
this.shouldFirstRefresh = shouldFirstRefresh;
}
/**
* @return an estimate of space used in the directory path.
*/
@ -156,9 +179,11 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
private static final class RefreshThread implements Runnable {
final CachingGetSpaceUsed spaceUsed;
private boolean runImmediately;
RefreshThread(CachingGetSpaceUsed spaceUsed) {
RefreshThread(CachingGetSpaceUsed spaceUsed, boolean runImmediately) {
this.spaceUsed = spaceUsed;
this.runImmediately = runImmediately;
}
@Override
@ -176,7 +201,10 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
}
// Make sure that after the jitter we didn't end up at 0.
refreshInterval = Math.max(refreshInterval, 1);
Thread.sleep(refreshInterval);
if (!runImmediately) {
Thread.sleep(refreshInterval);
}
runImmediately = false;
// update the used variable
spaceUsed.refresh();
} catch (InterruptedException e) {

View File

@ -661,5 +661,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
AutoCloseableLock acquireDatasetLock();
/**
* Deep copy the replica info belonging to given block pool.
* @param bpid Specified block pool id.
* @return A set of replica info.
* @throws IOException
*/
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
}

View File

@ -201,16 +201,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
/**
* The deepCopyReplica call doesn't use the datasetock since it will lead the
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
*/
@Override
public Set<? extends Replica> deepCopyReplica(String bpid)
throws IOException {
Set<? extends Replica> replicas =
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
: volumeMap.replicas(bpid));
Set<? extends Replica> replicas = null;
try (AutoCloseableLock lock = datasetLock.acquire()) {
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
EMPTY_SET : volumeMap.replicas(bpid));
}
return Collections.unmodifiableSet(replicas);
}

View File

@ -59,6 +59,7 @@ public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed {
public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
super(builder);
setShouldFirstRefresh(false);
volume = builder.getVolume();
bpid = builder.getBpid();
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CachingGetSpaceUsed;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -27,8 +28,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -36,6 +40,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
import static org.junit.Assert.assertEquals;
@ -145,4 +150,54 @@ public class TestReplicaCachingGetSpaceUsed {
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
}
@Test(timeout = 15000)
public void testFsDatasetImplDeepCopyReplica() {
FsDatasetSpi<?> fsDataset = dataNode.getFSDataset();
ModifyThread modifyThread = new ModifyThread();
modifyThread.start();
String bpid = cluster.getNamesystem(0).getBlockPoolId();
int retryTimes = 10;
while (retryTimes > 0) {
try {
Set<? extends Replica> replicas = fsDataset.deepCopyReplica(bpid);
if (replicas.size() > 0) {
retryTimes--;
}
} catch (IOException e) {
modifyThread.setShouldRun(false);
Assert.fail("Encounter IOException when deep copy replica.");
}
}
modifyThread.setShouldRun(false);
}
private class ModifyThread extends Thread {
private boolean shouldRun = true;
@Override
public void run() {
FSDataOutputStream os = null;
while (shouldRun) {
try {
int id = RandomUtils.nextInt();
os = fs.create(new Path("/testFsDatasetImplDeepCopyReplica/" + id));
byte[] bytes = new byte[2048];
InputStream is = new ByteArrayInputStream(bytes);
IOUtils.copyBytes(is, os, bytes.length);
os.hsync();
os.close();
} catch (IOException e) {}
}
try {
fs.delete(new Path("/testFsDatasetImplDeepCopyReplica"), true);
} catch (IOException e) {}
}
private void setShouldRun(boolean shouldRun) {
this.shouldRun = shouldRun;
}
}
}