HDFS-15010. BlockPoolSlice#addReplicaThreadPool static pool should be initialized by static method. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
a97f7776bd
commit
5eb21b83df
|
@ -191,7 +191,7 @@ class BlockPoolSlice {
|
||||||
|
|
||||||
if (addReplicaThreadPool == null) {
|
if (addReplicaThreadPool == null) {
|
||||||
// initialize add replica fork join pool
|
// initialize add replica fork join pool
|
||||||
initializeAddReplicaPool(conf);
|
initializeAddReplicaPool(conf, (FsDatasetImpl) volume.getDataset());
|
||||||
}
|
}
|
||||||
// Make the dfs usage to be saved during shutdown.
|
// Make the dfs usage to be saved during shutdown.
|
||||||
shutdownHook = new Runnable() {
|
shutdownHook = new Runnable() {
|
||||||
|
@ -207,9 +207,9 @@ class BlockPoolSlice {
|
||||||
SHUTDOWN_HOOK_PRIORITY);
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void initializeAddReplicaPool(Configuration conf) {
|
private synchronized static void initializeAddReplicaPool(Configuration conf,
|
||||||
|
FsDatasetImpl dataset) {
|
||||||
if (addReplicaThreadPool == null) {
|
if (addReplicaThreadPool == null) {
|
||||||
FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
|
|
||||||
int numberOfBlockPoolSlice = dataset.getVolumeCount()
|
int numberOfBlockPoolSlice = dataset.getVolumeCount()
|
||||||
* dataset.getBPServiceCount();
|
* dataset.getBPServiceCount();
|
||||||
int poolsize = Math.max(numberOfBlockPoolSlice,
|
int poolsize = Math.max(numberOfBlockPoolSlice,
|
||||||
|
@ -1049,4 +1049,15 @@ class BlockPoolSlice {
|
||||||
public static int getAddReplicaForkPoolSize() {
|
public static int getAddReplicaForkPoolSize() {
|
||||||
return addReplicaThreadPool.getPoolSize();
|
return addReplicaThreadPool.getPoolSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ForkJoinPool getAddReplicaThreadPool() {
|
||||||
|
return addReplicaThreadPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void reInitializeAddReplicaThreadPool() {
|
||||||
|
addReplicaThreadPool.shutdown();
|
||||||
|
addReplicaThreadPool = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,9 @@ import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
|
@ -48,6 +50,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
@ -328,6 +331,7 @@ public class TestFsVolumeList {
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
|
public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
|
||||||
|
BlockPoolSlice.reInitializeAddReplicaThreadPool();
|
||||||
Configuration cnf = new Configuration();
|
Configuration cnf = new Configuration();
|
||||||
int poolSize = 5;
|
int poolSize = 5;
|
||||||
cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
|
@ -374,7 +378,29 @@ public class TestFsVolumeList {
|
||||||
vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
|
vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
|
||||||
assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
|
assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
|
||||||
.size() == 1000);
|
.size() == 1000);
|
||||||
assertTrue("Fork pool size should be " + poolSize,
|
assertEquals("Fork pool should be initialize with configured pool size",
|
||||||
BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize);
|
poolSize, BlockPoolSlice.getAddReplicaForkPoolSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testInstanceOfAddReplicaThreadPool() throws Exception {
|
||||||
|
// Start cluster with multiple namespace
|
||||||
|
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(
|
||||||
|
new HdfsConfiguration())
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
|
||||||
|
.numDataNodes(1).build()) {
|
||||||
|
cluster.waitActive();
|
||||||
|
FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
|
||||||
|
.getFSDataset();
|
||||||
|
FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences()
|
||||||
|
.get(0);
|
||||||
|
ForkJoinPool threadPool1 = vol.getBlockPoolSlice(
|
||||||
|
cluster.getNamesystem(0).getBlockPoolId()).getAddReplicaThreadPool();
|
||||||
|
ForkJoinPool threadPool2 = vol.getBlockPoolSlice(
|
||||||
|
cluster.getNamesystem(1).getBlockPoolId()).getAddReplicaThreadPool();
|
||||||
|
assertEquals(
|
||||||
|
"Thread pool instance should be same in all the BlockPoolSlice",
|
||||||
|
threadPool1, threadPool2);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue