HDFS-12354. Ozone: Shuffle container list for datanode BlockDeletingService. Contributed by Yiqun Lin.
This commit is contained in:
parent
b06f4f63e3
commit
0f60507521
|
@ -189,6 +189,8 @@ public final class ScmConfigKeys {
|
|||
"ozone.scm.container.provision_batch_size";
|
||||
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 5;
|
||||
|
||||
public static final String OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY =
|
||||
"ozone.scm.container.deletion-choosing.policy";
|
||||
|
||||
/**
|
||||
* Don't start processing a pool if we have not had a minimum number of
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
|
@ -36,10 +37,12 @@ import org.apache.hadoop.ozone.OzoneConsts;
|
|||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces
|
||||
.ContainerLocationManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -107,6 +110,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
private KeyManager keyManager;
|
||||
private Configuration conf;
|
||||
|
||||
private ContainerDeletionChoosingPolicy containerDeletionChooser;
|
||||
|
||||
/**
|
||||
* Init call that sets up a container Manager.
|
||||
*
|
||||
|
@ -127,6 +132,12 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
this.conf = config;
|
||||
readLock();
|
||||
try {
|
||||
// TODO: Use pending deletion blocks based policy as default way
|
||||
containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass(
|
||||
ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
|
||||
RandomContainerDeletionChoosingPolicy.class,
|
||||
ContainerDeletionChoosingPolicy.class), conf);
|
||||
|
||||
for (StorageLocation path : containerDirs) {
|
||||
File directory = Paths.get(path.getNormalizedUri()).toFile();
|
||||
if (!directory.exists() && !directory.mkdirs()) {
|
||||
|
@ -416,7 +427,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
throw new StorageContainerException("No such container. Name : " +
|
||||
containerName, CONTAINER_NOT_FOUND);
|
||||
}
|
||||
ContainerUtils.removeContainer(status.containerData, conf, forceDelete);
|
||||
ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete);
|
||||
containerMap.remove(containerName);
|
||||
} catch (StorageContainerException e) {
|
||||
throw e;
|
||||
|
@ -814,51 +825,20 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is an immutable class that represents the state of a container. if the
|
||||
* container reading encountered an error when we boot up we will post that
|
||||
* info to a recovery queue and keep the info in the containerMap.
|
||||
* <p/>
|
||||
* if and when the issue is fixed, the expectation is that this entry will be
|
||||
* deleted by the recovery thread from the containerMap and will insert entry
|
||||
* instead of modifying this class.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static class ContainerStatus {
|
||||
private final ContainerData containerData;
|
||||
private final boolean active;
|
||||
|
||||
/**
|
||||
* Creates a Container Status class.
|
||||
*
|
||||
* @param containerData - ContainerData.
|
||||
* @param active - Active or not active.
|
||||
*/
|
||||
ContainerStatus(ContainerData containerData, boolean active) {
|
||||
this.containerData = containerData;
|
||||
this.active = active;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns container if it is active. It is not active if we have had an
|
||||
* error and we are waiting for the background threads to fix the issue.
|
||||
*
|
||||
* @return ContainerData.
|
||||
*/
|
||||
public ContainerData getContainer() {
|
||||
if (active) {
|
||||
return containerData;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates if a container is Active.
|
||||
*
|
||||
* @return true if it is active.
|
||||
*/
|
||||
public boolean isActive() {
|
||||
return active;
|
||||
@Override
|
||||
public List<ContainerData> chooseContainerForBlockDeletion(
|
||||
int count) throws StorageContainerException {
|
||||
readLock();
|
||||
try {
|
||||
return containerDeletionChooser.chooseContainerForBlockDeletion(
|
||||
count, containerMap);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ContainerDeletionChoosingPolicy getContainerDeletionChooser() {
|
||||
return containerDeletionChooser;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
|
||||
/**
|
||||
* This is an immutable class that represents the state of a container. if the
|
||||
* container reading encountered an error when we boot up we will post that
|
||||
* info to a recovery queue and keep the info in the containerMap.
|
||||
* <p/>
|
||||
* if and when the issue is fixed, the expectation is that this entry will be
|
||||
* deleted by the recovery thread from the containerMap and will insert entry
|
||||
* instead of modifying this class.
|
||||
*/
|
||||
public class ContainerStatus {
|
||||
private final ContainerData containerData;
|
||||
private final boolean active;
|
||||
|
||||
/**
|
||||
* Creates a Container Status class.
|
||||
*
|
||||
* @param containerData - ContainerData.
|
||||
* @param active - Active or not active.
|
||||
*/
|
||||
ContainerStatus(ContainerData containerData, boolean active) {
|
||||
this.containerData = containerData;
|
||||
this.active = active;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns container if it is active. It is not active if we have had an
|
||||
* error and we are waiting for the background threads to fix the issue.
|
||||
*
|
||||
* @return ContainerData.
|
||||
*/
|
||||
public ContainerData getContainer() {
|
||||
if (active) {
|
||||
return containerData;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicates if a container is Active.
|
||||
*
|
||||
* @return true if it is active.
|
||||
*/
|
||||
public boolean isActive() {
|
||||
return active;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Randomly choosing containers for block deletion.
|
||||
*/
|
||||
public class RandomContainerDeletionChoosingPolicy
|
||||
implements ContainerDeletionChoosingPolicy {
|
||||
|
||||
@Override
|
||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||
Map<String, ContainerStatus> candidateContainers)
|
||||
throws StorageContainerException {
|
||||
Preconditions.checkNotNull(candidateContainers,
|
||||
"Internal assertion: candidate containers cannot be null");
|
||||
|
||||
int currentCount = 0;
|
||||
List<ContainerData> result = new LinkedList<>();
|
||||
ContainerStatus[] values = new ContainerStatus[candidateContainers.size()];
|
||||
// to get a shuffle list
|
||||
for (ContainerStatus entry : DFSUtil.shuffle(
|
||||
candidateContainers.values().toArray(values))) {
|
||||
if (currentCount < count) {
|
||||
result.add(entry.getContainer());
|
||||
currentCount++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.interfaces;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerStatus;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
|
||||
/**
|
||||
* This interface is used for choosing desired containers for
|
||||
* block deletion.
|
||||
*/
|
||||
public interface ContainerDeletionChoosingPolicy {
|
||||
|
||||
/**
|
||||
* Chooses desired containers for block deletion.
|
||||
* @param count
|
||||
* how many to return
|
||||
* @param candidateContainers
|
||||
* candidate containers collection
|
||||
* @return container data list
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||
Map<String, ContainerStatus> candidateContainers)
|
||||
throws StorageContainerException;
|
||||
}
|
|
@ -95,6 +95,15 @@ public interface ContainerManager extends RwLock {
|
|||
List<ContainerData> data)
|
||||
throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Choose containers for block deletion.
|
||||
*
|
||||
* @param count - how many to return
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
List<ContainerData> chooseContainerForBlockDeletion(int count)
|
||||
throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Get metadata about a specific container.
|
||||
*
|
||||
|
|
|
@ -102,8 +102,8 @@ public class BlockDeletingService extends BackgroundService{
|
|||
// We at most list a number of containers a time,
|
||||
// in case there are too many containers and start too many workers.
|
||||
// We must ensure there is no empty container in this result.
|
||||
containerManager.listContainer(null, containerLimitPerInterval,
|
||||
null, containers);
|
||||
containers = containerManager.chooseContainerForBlockDeletion(
|
||||
containerLimitPerInterval);
|
||||
|
||||
// TODO
|
||||
// in case we always fetch a few same containers,
|
||||
|
|
|
@ -416,6 +416,21 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.scm.container.deletion-choosing.policy</name>
|
||||
<value>org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy</value>
|
||||
<description>
|
||||
The policy used for choosing desire containers for block deletion.
|
||||
Datanode selects a number of containers to process block deletion
|
||||
in a certain interval defined by ozone.block.deleting.service.interval.ms,
|
||||
the number of containers to process in each interval is defined
|
||||
by ozone.block.deleting.container.limit.per.interval. This property
|
||||
is used to configure the policy applied while selecting containers,
|
||||
org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy
|
||||
implements a simply random policy that to return a random list of containers.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.container.ipc</name>
|
||||
<value>50011</value>
|
||||
|
|
|
@ -0,0 +1,108 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.container.ContainerTestHelper.createSingleNodePipeline;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* The class for testing container deletion choosing policy.
|
||||
*/
|
||||
public class TestContainerDeletionChoosingPolicy {
|
||||
private static String path;
|
||||
private static ContainerManagerImpl containerManager;
|
||||
private static OzoneConfiguration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Throwable {
|
||||
conf = new OzoneConfiguration();
|
||||
path = GenericTestUtils
|
||||
.getTempPath(TestContainerDeletionChoosingPolicy.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
|
||||
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws IOException {
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomChoosingPolicy() throws IOException {
|
||||
File containerDir = new File(path);
|
||||
if (containerDir.exists()) {
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
}
|
||||
Assert.assertTrue(containerDir.mkdirs());
|
||||
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
|
||||
RandomContainerDeletionChoosingPolicy.class.getName());
|
||||
List<StorageLocation> pathLists = new LinkedList<>();
|
||||
pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath()));
|
||||
containerManager = new ContainerManagerImpl();
|
||||
containerManager.init(conf, pathLists);
|
||||
|
||||
int numContainers = 10;
|
||||
for (int i = 0; i < numContainers; i++) {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
ContainerData data = new ContainerData(containerName);
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName),
|
||||
data);
|
||||
Assert.assertTrue(
|
||||
containerManager.getContainerMap().containsKey(containerName));
|
||||
}
|
||||
|
||||
List<ContainerData> result0 = containerManager
|
||||
.chooseContainerForBlockDeletion(5);
|
||||
Assert.assertEquals(5, result0.size());
|
||||
|
||||
// test random choosing
|
||||
List<ContainerData> result1 = containerManager
|
||||
.chooseContainerForBlockDeletion(numContainers);
|
||||
List<ContainerData> result2 = containerManager
|
||||
.chooseContainerForBlockDeletion(numContainers);
|
||||
|
||||
boolean hasShuffled = false;
|
||||
for (int i = 0; i < numContainers; i++) {
|
||||
if (!result1.get(i).getContainerName()
|
||||
.equals(result2.get(i).getContainerName())) {
|
||||
hasShuffled = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("Chosen container results were same", hasShuffled);
|
||||
}
|
||||
}
|
|
@ -170,7 +170,7 @@ public class TestContainerPersistence {
|
|||
data);
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName));
|
||||
ContainerManagerImpl.ContainerStatus status = containerManager
|
||||
ContainerStatus status = containerManager
|
||||
.getContainerMap().get(containerName);
|
||||
|
||||
Assert.assertTrue(status.isActive());
|
||||
|
|
Loading…
Reference in New Issue