HDFS-12370. Ozone: Implement TopN container choosing policy for BlockDeletionService. Contributed by Yiqun Lin.
This commit is contained in:
parent
743be0d7c0
commit
fec04f1158
|
@ -32,6 +32,11 @@ public class ContainerStatus {
|
|||
private final ContainerData containerData;
|
||||
private final boolean active;
|
||||
|
||||
/**
|
||||
* Number of pending deletion blocks in container.
|
||||
*/
|
||||
private int numPendingDeletionBlocks;
|
||||
|
||||
/**
|
||||
* Creates a Container Status class.
|
||||
*
|
||||
|
@ -39,6 +44,7 @@ public class ContainerStatus {
|
|||
* @param active - Active or not active.
|
||||
*/
|
||||
ContainerStatus(ContainerData containerData, boolean active) {
|
||||
this.numPendingDeletionBlocks = 0;
|
||||
this.containerData = containerData;
|
||||
this.active = active;
|
||||
}
|
||||
|
@ -64,4 +70,29 @@ public class ContainerStatus {
|
|||
public boolean isActive() {
|
||||
return active;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase the count of pending deletion blocks.
|
||||
*
|
||||
* @param numBlocks increment number
|
||||
*/
|
||||
public void incrPendingDeletionBlocks(int numBlocks) {
|
||||
this.numPendingDeletionBlocks += numBlocks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrease the count of pending deletion blocks.
|
||||
*
|
||||
* @param numBlocks decrement number
|
||||
*/
|
||||
public void decrPendingDeletionBlocks(int numBlocks) {
|
||||
this.numPendingDeletionBlocks -= numBlocks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending deletion blocks.
|
||||
*/
|
||||
public int getNumPendingDeletionBlocks() {
|
||||
return this.numPendingDeletionBlocks;
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
|
@ -33,6 +35,8 @@ import com.google.common.base.Preconditions;
|
|||
*/
|
||||
public class RandomContainerDeletionChoosingPolicy
|
||||
implements ContainerDeletionChoosingPolicy {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RandomContainerDeletionChoosingPolicy.class);
|
||||
|
||||
@Override
|
||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||
|
@ -50,6 +54,11 @@ public class RandomContainerDeletionChoosingPolicy
|
|||
if (currentCount < count) {
|
||||
result.add(entry.getContainer());
|
||||
currentCount++;
|
||||
|
||||
LOG.debug("Select container {} for block deletion, "
|
||||
+ "pending deletion blocks num: {}.",
|
||||
entry.getContainer().getContainerName(),
|
||||
entry.getNumPendingDeletionBlocks());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.impl;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* TopN Ordered choosing policy that choosing containers based on pending
|
||||
* deletion blocks' number.
|
||||
*/
|
||||
public class TopNOrderedContainerDeletionChoosingPolicy
|
||||
implements ContainerDeletionChoosingPolicy {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class);
|
||||
|
||||
/** customized comparator used to compare differentiate container status. **/
|
||||
private static final Comparator<ContainerStatus> CONTAINER_STATUS_COMPARATOR
|
||||
= new Comparator<ContainerStatus>() {
|
||||
@Override
|
||||
public int compare(ContainerStatus c1, ContainerStatus c2) {
|
||||
return Integer.compare(c2.getNumPendingDeletionBlocks(),
|
||||
c1.getNumPendingDeletionBlocks());
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||
Map<String, ContainerStatus> candidateContainers)
|
||||
throws StorageContainerException {
|
||||
Preconditions.checkNotNull(candidateContainers,
|
||||
"Internal assertion: candidate containers cannot be null");
|
||||
|
||||
List<ContainerData> result = new LinkedList<>();
|
||||
List<ContainerStatus> orderedList = new LinkedList<>();
|
||||
orderedList.addAll(candidateContainers.values());
|
||||
Collections.sort(orderedList, CONTAINER_STATUS_COMPARATOR);
|
||||
|
||||
// get top N list ordered by pending deletion blocks' number
|
||||
int currentCount = 0;
|
||||
for (ContainerStatus entry : orderedList) {
|
||||
if (currentCount < count) {
|
||||
result.add(entry.getContainer());
|
||||
currentCount++;
|
||||
|
||||
LOG.debug("Select container {} for block deletion, "
|
||||
+ "pending deletion blocks num: {}.",
|
||||
entry.getContainer().getContainerName(),
|
||||
entry.getNumPendingDeletionBlocks());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -179,4 +179,24 @@ public interface ContainerManager extends RwLock {
|
|||
* @throws IOException
|
||||
*/
|
||||
List<ContainerData> getContainerReports() throws IOException;
|
||||
|
||||
/**
|
||||
* Increase pending deletion blocks count number of specified container.
|
||||
*
|
||||
* @param numBlocks
|
||||
* increment count number
|
||||
* @param containerId
|
||||
* container id
|
||||
*/
|
||||
void incrPendingDeletionBlocks(int numBlocks, String containerId);
|
||||
|
||||
/**
|
||||
* Decrease pending deletion blocks count number of specified container.
|
||||
*
|
||||
* @param numBlocks
|
||||
* decrement count number
|
||||
* @param containerId
|
||||
* container id
|
||||
*/
|
||||
void decrPendingDeletionBlocks(int numBlocks, String containerId);
|
||||
}
|
||||
|
|
|
@ -102,12 +102,11 @@ public class BlockDeletingService extends BackgroundService{
|
|||
// We at most list a number of containers a time,
|
||||
// in case there are too many containers and start too many workers.
|
||||
// We must ensure there is no empty container in this result.
|
||||
// The chosen result depends on what container deletion policy is
|
||||
// configured.
|
||||
containers = containerManager.chooseContainerForBlockDeletion(
|
||||
containerLimitPerInterval);
|
||||
|
||||
// TODO
|
||||
// in case we always fetch a few same containers,
|
||||
// should we list some more containers a time and shuffle them?
|
||||
for(ContainerData container : containers) {
|
||||
BlockDeletingTask containerTask =
|
||||
new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
|
||||
|
@ -214,6 +213,9 @@ public class BlockDeletingService extends BackgroundService{
|
|||
succeedBlocks.forEach(entry ->
|
||||
batch.delete(DFSUtil.string2Bytes(entry)));
|
||||
meta.writeBatch(batch);
|
||||
// update count of pending deletion blocks in in-memory container status
|
||||
containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),
|
||||
containerData.getContainerName());
|
||||
|
||||
if (!succeedBlocks.isEmpty()) {
|
||||
LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
|
||||
|
|
|
@ -149,6 +149,8 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
|
|||
LOG.debug("Processing Container : {}, DB path : {}", containerId,
|
||||
containerInfo.getDBPath());
|
||||
}
|
||||
|
||||
int newDeletionBlocks = 0;
|
||||
MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
|
||||
for (String blk : delTX.getBlockIDList()) {
|
||||
BatchOperation batch = new BatchOperation();
|
||||
|
@ -162,6 +164,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
|
|||
batch.delete(blkBytes);
|
||||
try {
|
||||
containerDB.writeBatch(batch);
|
||||
newDeletionBlocks++;
|
||||
LOG.info("Transited Block {} to DELETING state in container {}",
|
||||
blk, containerId);
|
||||
} catch (IOException e) {
|
||||
|
@ -176,6 +179,9 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
|
|||
+ " container {}, skip deleting it.", blk, containerId);
|
||||
}
|
||||
}
|
||||
|
||||
// update pending deletion blocks count in in-memory container status
|
||||
containerManager.incrPendingDeletionBlocks(newDeletionBlocks, containerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -418,16 +418,21 @@
|
|||
|
||||
<property>
|
||||
<name>ozone.scm.container.deletion-choosing.policy</name>
|
||||
<value>org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy</value>
|
||||
<value>org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy</value>
|
||||
<description>
|
||||
The policy used for choosing desire containers for block deletion.
|
||||
Datanode selects a number of containers to process block deletion
|
||||
in a certain interval defined by ozone.block.deleting.service.interval.ms,
|
||||
the number of containers to process in each interval is defined
|
||||
by ozone.block.deleting.container.limit.per.interval. This property
|
||||
is used to configure the policy applied while selecting containers,
|
||||
is used to configure the policy applied while selecting containers.
|
||||
There are two policies supporting now: RandomContainerDeletionChoosingPolicy and
|
||||
TopNOrderedContainerDeletionChoosingPolicy.
|
||||
org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy
|
||||
implements a simply random policy that to return a random list of containers.
|
||||
org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy
|
||||
implements a policy that choosing top count number of containers in a pending-deletion-blocks's num
|
||||
based descending order.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
|
|
@ -21,20 +21,27 @@ import static org.apache.hadoop.ozone.container.ContainerTestHelper.createSingle
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -45,8 +52,8 @@ public class TestContainerDeletionChoosingPolicy {
|
|||
private static ContainerManagerImpl containerManager;
|
||||
private static OzoneConfiguration conf;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Throwable {
|
||||
@Before
|
||||
public void init() throws Throwable {
|
||||
conf = new OzoneConfiguration();
|
||||
path = GenericTestUtils
|
||||
.getTempPath(TestContainerDeletionChoosingPolicy.class.getSimpleName());
|
||||
|
@ -55,9 +62,16 @@ public class TestContainerDeletionChoosingPolicy {
|
|||
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws IOException {
|
||||
@After
|
||||
public void shutdown() throws IOException {
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
|
||||
containerManager.writeLock();
|
||||
try{
|
||||
containerManager.shutdown();
|
||||
} finally {
|
||||
containerManager.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -105,4 +119,68 @@ public class TestContainerDeletionChoosingPolicy {
|
|||
}
|
||||
Assert.assertTrue("Chosen container results were same", hasShuffled);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNOrderedChoosingPolicy() throws IOException {
|
||||
File containerDir = new File(path);
|
||||
if (containerDir.exists()) {
|
||||
FileUtils.deleteDirectory(new File(path));
|
||||
}
|
||||
Assert.assertTrue(containerDir.mkdirs());
|
||||
|
||||
conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
|
||||
TopNOrderedContainerDeletionChoosingPolicy.class.getName());
|
||||
List<StorageLocation> pathLists = new LinkedList<>();
|
||||
pathLists.add(StorageLocation.parse(containerDir.getAbsolutePath()));
|
||||
containerManager = new ContainerManagerImpl();
|
||||
containerManager.init(conf, pathLists);
|
||||
|
||||
int numContainers = 10;
|
||||
Random random = new Random();
|
||||
Map<String, Integer> name2Count = new HashMap<>();
|
||||
for (int i = 0; i < numContainers; i++) {
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
ContainerData data = new ContainerData(containerName);
|
||||
containerManager.createContainer(createSingleNodePipeline(containerName),
|
||||
data);
|
||||
Assert.assertTrue(
|
||||
containerManager.getContainerMap().containsKey(containerName));
|
||||
|
||||
// create random number of deletion blocks and write to container db
|
||||
int deletionBlocks = random.nextInt(numContainers) + 1;
|
||||
// record <ContainerName, DeletionCount> value
|
||||
name2Count.put(containerName, deletionBlocks);
|
||||
for (int j = 0; j <= deletionBlocks; j++) {
|
||||
MetadataStore metadata = KeyUtils.getDB(data, conf);
|
||||
String blk = "blk" + i + "-" + j;
|
||||
byte[] blkBytes = DFSUtil.string2Bytes(blk);
|
||||
metadata.put(
|
||||
DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
|
||||
blkBytes);
|
||||
}
|
||||
}
|
||||
|
||||
containerManager.writeLock();
|
||||
containerManager.shutdown();
|
||||
containerManager.writeUnlock();
|
||||
containerManager.init(conf, pathLists);
|
||||
|
||||
List<ContainerData> result0 = containerManager
|
||||
.chooseContainerForBlockDeletion(5);
|
||||
Assert.assertEquals(5, result0.size());
|
||||
|
||||
List<ContainerData> result1 = containerManager
|
||||
.chooseContainerForBlockDeletion(numContainers);
|
||||
|
||||
// verify the order of return list
|
||||
int lastCount = Integer.MAX_VALUE;
|
||||
for (ContainerData data : result1) {
|
||||
int currentCount = name2Count.remove(data.getContainerName());
|
||||
// previous count should not smaller than next one
|
||||
Assert.assertTrue(currentCount > 0 && currentCount <= lastCount);
|
||||
lastCount = currentCount;
|
||||
}
|
||||
// ensure all the container data are compared
|
||||
Assert.assertEquals(0, name2Count.size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,10 +32,10 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
|||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -184,14 +184,9 @@ public class TestContainerPersistence {
|
|||
Path meta = Paths.get(status.getContainer().getDBPath()).getParent();
|
||||
Assert.assertTrue(meta != null && Files.exists(meta));
|
||||
|
||||
|
||||
String dbPath = status.getContainer().getDBPath();
|
||||
MetadataStore store = null;
|
||||
try {
|
||||
store = MetadataStoreBuilder.newBuilder()
|
||||
.setDbFile(new File(dbPath))
|
||||
.setCreateIfMissing(false)
|
||||
.build();
|
||||
store = KeyUtils.getDB(status.getContainer(), conf);
|
||||
Assert.assertNotNull(store);
|
||||
} finally {
|
||||
if (store != null) {
|
||||
|
|
Loading…
Reference in New Issue