HDFS-14986. ReplicaCachingGetSpaceUsed throws ConcurrentModificationException. Contributed by Aiphago.
(cherry picked from commit 2b452b4e60
)
This commit is contained in:
parent
853eafa81a
commit
26b51f3e22
|
@ -47,6 +47,7 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
private final long jitter;
|
private final long jitter;
|
||||||
private final String dirPath;
|
private final String dirPath;
|
||||||
private Thread refreshUsed;
|
private Thread refreshUsed;
|
||||||
|
private boolean shouldFirstRefresh;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is the constructor used by the builder.
|
* This is the constructor used by the builder.
|
||||||
|
@ -79,16 +80,30 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
this.refreshInterval = interval;
|
this.refreshInterval = interval;
|
||||||
this.jitter = jitter;
|
this.jitter = jitter;
|
||||||
this.used.set(initialUsed);
|
this.used.set(initialUsed);
|
||||||
|
this.shouldFirstRefresh = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void init() {
|
void init() {
|
||||||
if (used.get() < 0) {
|
if (used.get() < 0) {
|
||||||
used.set(0);
|
used.set(0);
|
||||||
|
if (!shouldFirstRefresh) {
|
||||||
|
// Skip initial refresh operation, so we need to do first refresh
|
||||||
|
// operation immediately in refresh thread.
|
||||||
|
initRefeshThread(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
refresh();
|
refresh();
|
||||||
}
|
}
|
||||||
|
initRefeshThread(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RunImmediately should set true, if we skip the first refresh.
|
||||||
|
* @param runImmediately The param default should be false.
|
||||||
|
*/
|
||||||
|
private void initRefeshThread (boolean runImmediately) {
|
||||||
if (refreshInterval > 0) {
|
if (refreshInterval > 0) {
|
||||||
refreshUsed = new Thread(new RefreshThread(this),
|
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
|
||||||
"refreshUsed-" + dirPath);
|
"refreshUsed-" + dirPath);
|
||||||
refreshUsed.setDaemon(true);
|
refreshUsed.setDaemon(true);
|
||||||
refreshUsed.start();
|
refreshUsed.start();
|
||||||
|
@ -100,6 +115,14 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
|
|
||||||
protected abstract void refresh();
|
protected abstract void refresh();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset that if we need to do the first refresh.
|
||||||
|
* @param shouldFirstRefresh The flag value to set.
|
||||||
|
*/
|
||||||
|
protected void setShouldFirstRefresh(boolean shouldFirstRefresh) {
|
||||||
|
this.shouldFirstRefresh = shouldFirstRefresh;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return an estimate of space used in the directory path.
|
* @return an estimate of space used in the directory path.
|
||||||
*/
|
*/
|
||||||
|
@ -156,9 +179,11 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
private static final class RefreshThread implements Runnable {
|
private static final class RefreshThread implements Runnable {
|
||||||
|
|
||||||
final CachingGetSpaceUsed spaceUsed;
|
final CachingGetSpaceUsed spaceUsed;
|
||||||
|
private boolean runImmediately;
|
||||||
|
|
||||||
RefreshThread(CachingGetSpaceUsed spaceUsed) {
|
RefreshThread(CachingGetSpaceUsed spaceUsed, boolean runImmediately) {
|
||||||
this.spaceUsed = spaceUsed;
|
this.spaceUsed = spaceUsed;
|
||||||
|
this.runImmediately = runImmediately;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -176,7 +201,10 @@ public abstract class CachingGetSpaceUsed implements Closeable, GetSpaceUsed {
|
||||||
}
|
}
|
||||||
// Make sure that after the jitter we didn't end up at 0.
|
// Make sure that after the jitter we didn't end up at 0.
|
||||||
refreshInterval = Math.max(refreshInterval, 1);
|
refreshInterval = Math.max(refreshInterval, 1);
|
||||||
Thread.sleep(refreshInterval);
|
if (!runImmediately) {
|
||||||
|
Thread.sleep(refreshInterval);
|
||||||
|
}
|
||||||
|
runImmediately = false;
|
||||||
// update the used variable
|
// update the used variable
|
||||||
spaceUsed.refresh();
|
spaceUsed.refresh();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -661,5 +661,11 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
||||||
*/
|
*/
|
||||||
AutoCloseableLock acquireDatasetLock();
|
AutoCloseableLock acquireDatasetLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deep copy the replica info belonging to given block pool.
|
||||||
|
* @param bpid Specified block pool id.
|
||||||
|
* @return A set of replica info.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
|
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -198,16 +198,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The deepCopyReplica call doesn't use the datasetock since it will lead the
|
|
||||||
* potential deadlock with the {@link FsVolumeList#addBlockPool} call.
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public Set<? extends Replica> deepCopyReplica(String bpid)
|
public Set<? extends Replica> deepCopyReplica(String bpid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Set<? extends Replica> replicas =
|
Set<? extends Replica> replicas = null;
|
||||||
new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.EMPTY_SET
|
try (AutoCloseableLock lock = datasetLock.acquire()) {
|
||||||
: volumeMap.replicas(bpid));
|
replicas = new HashSet<>(volumeMap.replicas(bpid) == null ? Collections.
|
||||||
|
EMPTY_SET : volumeMap.replicas(bpid));
|
||||||
|
}
|
||||||
return Collections.unmodifiableSet(replicas);
|
return Collections.unmodifiableSet(replicas);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,6 +59,7 @@ public class ReplicaCachingGetSpaceUsed extends FSCachingGetSpaceUsed {
|
||||||
|
|
||||||
public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
|
public ReplicaCachingGetSpaceUsed(Builder builder) throws IOException {
|
||||||
super(builder);
|
super(builder);
|
||||||
|
setShouldFirstRefresh(false);
|
||||||
volume = builder.getVolume();
|
volume = builder.getVolume();
|
||||||
bpid = builder.getBpid();
|
bpid = builder.getBpid();
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CachingGetSpaceUsed;
|
import org.apache.hadoop.fs.CachingGetSpaceUsed;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -27,8 +28,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -36,6 +40,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -145,4 +150,54 @@ public class TestReplicaCachingGetSpaceUsed {
|
||||||
|
|
||||||
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
|
fs.delete(new Path("/testReplicaCachingGetSpaceUsedByRBWReplica"), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 15000)
|
||||||
|
public void testFsDatasetImplDeepCopyReplica() {
|
||||||
|
FsDatasetSpi<?> fsDataset = dataNode.getFSDataset();
|
||||||
|
ModifyThread modifyThread = new ModifyThread();
|
||||||
|
modifyThread.start();
|
||||||
|
String bpid = cluster.getNamesystem(0).getBlockPoolId();
|
||||||
|
int retryTimes = 10;
|
||||||
|
|
||||||
|
while (retryTimes > 0) {
|
||||||
|
try {
|
||||||
|
Set<? extends Replica> replicas = fsDataset.deepCopyReplica(bpid);
|
||||||
|
if (replicas.size() > 0) {
|
||||||
|
retryTimes--;
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
modifyThread.setShouldRun(false);
|
||||||
|
Assert.fail("Encounter IOException when deep copy replica.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
modifyThread.setShouldRun(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ModifyThread extends Thread {
|
||||||
|
private boolean shouldRun = true;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
FSDataOutputStream os = null;
|
||||||
|
while (shouldRun) {
|
||||||
|
try {
|
||||||
|
int id = RandomUtils.nextInt();
|
||||||
|
os = fs.create(new Path("/testFsDatasetImplDeepCopyReplica/" + id));
|
||||||
|
byte[] bytes = new byte[2048];
|
||||||
|
InputStream is = new ByteArrayInputStream(bytes);
|
||||||
|
IOUtils.copyBytes(is, os, bytes.length);
|
||||||
|
os.hsync();
|
||||||
|
os.close();
|
||||||
|
} catch (IOException e) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
fs.delete(new Path("/testFsDatasetImplDeepCopyReplica"), true);
|
||||||
|
} catch (IOException e) {}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void setShouldRun(boolean shouldRun) {
|
||||||
|
this.shouldRun = shouldRun;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue