HDFS-15010. BlockPoolSlice#addReplicaThreadPool static pool should be initialized by static method. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
2ad7b90505
commit
298cda22a3
|
@ -191,7 +191,7 @@ class BlockPoolSlice {
|
|||
|
||||
if (addReplicaThreadPool == null) {
|
||||
// initialize add replica fork join pool
|
||||
initializeAddReplicaPool(conf);
|
||||
initializeAddReplicaPool(conf, (FsDatasetImpl) volume.getDataset());
|
||||
}
|
||||
// Make the dfs usage to be saved during shutdown.
|
||||
shutdownHook = new Runnable() {
|
||||
|
@ -207,9 +207,9 @@ class BlockPoolSlice {
|
|||
SHUTDOWN_HOOK_PRIORITY);
|
||||
}
|
||||
|
||||
private synchronized void initializeAddReplicaPool(Configuration conf) {
|
||||
private synchronized static void initializeAddReplicaPool(Configuration conf,
|
||||
FsDatasetImpl dataset) {
|
||||
if (addReplicaThreadPool == null) {
|
||||
FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
|
||||
int numberOfBlockPoolSlice = dataset.getVolumeCount()
|
||||
* dataset.getBPServiceCount();
|
||||
int poolsize = Math.max(numberOfBlockPoolSlice,
|
||||
|
@ -1049,4 +1049,15 @@ class BlockPoolSlice {
|
|||
public static int getAddReplicaForkPoolSize() {
|
||||
return addReplicaThreadPool.getPoolSize();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ForkJoinPool getAddReplicaThreadPool() {
|
||||
return addReplicaThreadPool;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static void reInitializeAddReplicaThreadPool() {
|
||||
addReplicaThreadPool.shutdown();
|
||||
addReplicaThreadPool = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.hadoop.fs.StorageType;
|
|||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
|
@ -48,6 +50,7 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -328,6 +331,7 @@ public class TestFsVolumeList {
|
|||
|
||||
@Test(timeout = 60000)
|
||||
public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
|
||||
BlockPoolSlice.reInitializeAddReplicaThreadPool();
|
||||
Configuration cnf = new Configuration();
|
||||
int poolSize = 5;
|
||||
cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
|
@ -374,7 +378,29 @@ public class TestFsVolumeList {
|
|||
vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
|
||||
assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
|
||||
.size() == 1000);
|
||||
assertTrue("Fork pool size should be " + poolSize,
|
||||
BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize);
|
||||
assertEquals("Fork pool should be initialize with configured pool size",
|
||||
poolSize, BlockPoolSlice.getAddReplicaForkPoolSize());
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testInstanceOfAddReplicaThreadPool() throws Exception {
|
||||
// Start cluster with multiple namespace
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(
|
||||
new HdfsConfiguration())
|
||||
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||
.numDataNodes(1).build()) {
|
||||
cluster.waitActive();
|
||||
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
|
||||
.getFSDataset();
|
||||
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences()
|
||||
.get(0);
|
||||
ForkJoinPool threadPool1 = vol.getBlockPoolSlice(
|
||||
cluster.getNamesystem(0).getBlockPoolId()).getAddReplicaThreadPool();
|
||||
ForkJoinPool threadPool2 = vol.getBlockPoolSlice(
|
||||
cluster.getNamesystem(1).getBlockPoolId()).getAddReplicaThreadPool();
|
||||
assertEquals(
|
||||
"Thread pool instance should be same in all the BlockPoolSlice",
|
||||
threadPool1, threadPool2);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue