HDFS-12196. Ozone: DeleteKey-2: Implement block deleting service to delete stale blocks at background. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2017-08-11 18:45:55 +08:00 committed by Owen O'Malley
parent 285d61cd8d
commit 639b4fb8a9
12 changed files with 972 additions and 1 deletions

View File

@ -94,6 +94,24 @@ public final class OzoneConfigKeys {
"ozone.client.connection.timeout.ms";
public static final int OZONE_CLIENT_CONNECTION_TIMEOUT_MS_DEFAULT = 5000;
/**
* Configuration properties for Ozone Block Deleting Service.
*/
public static final String OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS =
"ozone.block.deleting.service.interval.ms";
public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
= 60000;
public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
"ozone.block.deleting.limit.per.task";
public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT
= 1000;
public static final String OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL
= "ozone.block.deleting.container.limit.per.interval";
public static final int
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT = 10;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -407,7 +407,7 @@ public class ContainerManagerImpl implements ContainerManager {
@VisibleForTesting
ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
return containerMap;
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.background;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BackgroundTaskResult;
import org.apache.hadoop.utils.BackgroundTaskQueue;
import org.apache.hadoop.utils.BackgroundTask;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
/**
* A per-datanode container block deleting service takes in charge
* of deleting staled ozone blocks.
*/
public class BlockDeletingService extends BackgroundService{
private static final Logger LOG =
LoggerFactory.getLogger(BlockDeletingService.class);
private final ContainerManager containerManager;
private final Configuration conf;
// Throttle number of blocks to delete per task,
// set to 1 for testing
private final int blockLimitPerTask;
// Throttle the number of containers to process concurrently at a time,
private final int containerLimitPerInterval;
// Task priority is useful when a to-delete block has weight.
private final static int TASK_PRIORITY_DEFAULT = 1;
// Core pool size for container tasks
private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
public BlockDeletingService(ContainerManager containerManager,
int serviceInterval, Configuration conf) {
super("BlockDeletingService", serviceInterval,
TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE);
this.containerManager = containerManager;
this.conf = conf;
this.blockLimitPerTask = conf.getInt(
OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER,
OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT);
this.containerLimitPerInterval = conf.getInt(
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
}
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
List<ContainerData> containers = Lists.newArrayList();
try {
// We at most list a number of containers a time,
// in case there are too many containers and start too many workers.
// We must ensure there is no empty container in this result.
containerManager.listContainer(null, containerLimitPerInterval,
null, containers);
// TODO
// in case we always fetch a few same containers,
// should we list some more containers a time and shuffle them?
for(ContainerData container : containers) {
BlockDeletingTask containerTask =
new BlockDeletingTask(container, TASK_PRIORITY_DEFAULT);
queue.add(containerTask);
}
} catch (StorageContainerException e) {
LOG.warn("Failed to initiate block deleting tasks, "
+ "caused by unable to get containers info. "
+ "Retry in next interval. ", e);
}
return queue;
}
private static class ContainerBackgroundTaskResult
implements BackgroundTaskResult {
private List<String> deletedBlockIds;
ContainerBackgroundTaskResult() {
deletedBlockIds = new LinkedList<>();
}
public void addBlockId(String blockId) {
deletedBlockIds.add(blockId);
}
public void addAll(List<String> blockIds) {
deletedBlockIds.addAll(blockIds);
}
public List<String> getDeletedBlocks() {
return deletedBlockIds;
}
@Override
public int getSize() {
return deletedBlockIds.size();
}
}
private class BlockDeletingTask
implements BackgroundTask<BackgroundTaskResult> {
private final int priority;
private final ContainerData containerData;
BlockDeletingTask(ContainerData containerName, int priority) {
this.priority = priority;
this.containerData = containerName;
}
@Override
public BackgroundTaskResult call() throws Exception {
// Scan container's db and get list of under deletion blocks
MetadataStore meta = KeyUtils.getDB(containerData, conf);
// # of blocks to delete is throttled
KeyPrefixFilter filter = new KeyPrefixFilter(
OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> toDeleteBlocks =
meta.getRangeKVs(null, blockLimitPerTask, filter);
if (toDeleteBlocks.isEmpty()) {
LOG.info("No under deletion block found in container : {}",
containerData.getContainerName());
}
List<String> succeedBlocks = new LinkedList<>();
LOG.info("Container : {}, To-Delete blocks : {}",
containerData.getContainerName(), toDeleteBlocks.size());
toDeleteBlocks.forEach(entry -> {
String blockName = DFSUtil.bytes2String(entry.getKey());
LOG.info("Deleting block {}", blockName);
try {
ContainerProtos.KeyData data =
ContainerProtos.KeyData.parseFrom(entry.getValue());
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
File chunkFile = new File(chunkInfo.getChunkName());
if (FileUtils.deleteQuietly(chunkFile)) {
LOG.info("block {} chunk {} deleted", blockName,
chunkFile.getAbsolutePath());
}
}
succeedBlocks.add(blockName);
} catch (InvalidProtocolBufferException e) {
LOG.error("Failed to parse block info for block {}", blockName, e);
}
});
// Once files are deleted ... clean up DB
BatchOperation batch = new BatchOperation();
succeedBlocks.forEach(entry ->
batch.delete(DFSUtil.string2Bytes(entry)));
meta.writeBatch(batch);
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
crr.addAll(succeedBlocks);
return crr;
}
@Override
public int getPriority() {
return priority;
}
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.background;

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.protocol.proto
@ -45,6 +46,10 @@ import java.util.List;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
/**
* Ozone main class sets up the network server and initializes the container
@ -60,6 +65,7 @@ public class OzoneContainer {
private final XceiverServerSpi server;
private final ChunkManager chunkManager;
private final KeyManager keyManager;
private final BlockDeletingService blockDeletingService;
/**
* Creates a network endpoint and enables Ozone container.
@ -90,6 +96,12 @@ public class OzoneContainer {
this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
manager.setKeyManager(this.keyManager);
int svcInterval = ozoneConfig.getInt(
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
this.blockDeletingService = new BlockDeletingService(manager,
svcInterval, ozoneConfig);
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
final boolean useRatis = ozoneConfig.getBoolean(
@ -107,6 +119,7 @@ public class OzoneContainer {
*/
public void start() throws IOException {
server.start();
blockDeletingService.start();
dispatcher.init();
}
@ -152,6 +165,7 @@ public class OzoneContainer {
this.chunkManager.shutdown();
this.keyManager.shutdown();
this.manager.shutdown();
this.blockDeletingService.shutdown();
LOG.info("container services shutdown complete.");
} catch (IOException ex) {
LOG.warn("container service shutdown error:", ex);

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
/**
* An abstract class for a background service in ozone.
* A background service schedules multiple child tasks in parallel
* in a certain period. In each interval, it waits until all the tasks
* finish execution and then schedule next interval.
*/
public abstract class BackgroundService {
private static final Logger LOG =
LoggerFactory.getLogger(BackgroundService.class);
// Executor to launch child tasks
private final ScheduledExecutorService exec;
private final ThreadGroup threadGroup;
private final ThreadFactory threadFactory;
private final String serviceName;
private final int interval;
private final TimeUnit unit;
public BackgroundService(String serviceName, int interval,
TimeUnit unit, int threadPoolSize) {
this.interval = interval;
this.unit = unit;
this.serviceName = serviceName;
threadGroup = new ThreadGroup(serviceName);
ThreadFactory tf = r -> new Thread(threadGroup, r);
threadFactory = new ThreadFactoryBuilder()
.setThreadFactory(tf)
.setDaemon(true)
.setNameFormat(serviceName + "#%d")
.build();
exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
}
protected ExecutorService getExecutorService() {
return this.exec;
}
@VisibleForTesting
public int getThreadCount() {
return threadGroup.activeCount();
}
// start service
public void start() {
exec.scheduleWithFixedDelay(new PeriodicalTask(), 0, interval, unit);
}
public abstract BackgroundTaskQueue getTasks();
/**
* Run one or more background tasks concurrently.
* Wait until all tasks to return the result.
*/
public class PeriodicalTask implements Runnable {
@Override
public void run() {
LOG.info("Running background service : {}", serviceName);
BackgroundTaskQueue tasks = getTasks();
if (tasks.isEmpty()) {
// No task found, or some problems to init tasks
// return and retry in next interval.
return;
}
LOG.info("Number of background tasks to execute : {}", tasks.size());
CompletionService<BackgroundTaskResult> taskCompletionService =
new ExecutorCompletionService<>(exec);
List<Future<BackgroundTaskResult>> results = Lists.newArrayList();
while (tasks.size() > 0) {
BackgroundTask task = tasks.poll();
Future<BackgroundTaskResult> result =
taskCompletionService.submit(task);
results.add(result);
}
results.parallelStream().forEach(taskResultFuture -> {
try {
// Collect task results
// TODO timeout in case task hangs
BackgroundTaskResult result = taskResultFuture.get();
if (LOG.isDebugEnabled()) {
LOG.debug("task execution result size {}", result.getSize());
}
} catch (InterruptedException | ExecutionException e) {
LOG.warn(
"Background task fails to execute, "
+ "retrying in next interval", e);
}
});
}
}
// shutdown and make sure all threads are properly released.
public void shutdown() {
LOG.info("Shutting down service {}", this.serviceName);
exec.shutdown();
try {
if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
exec.shutdownNow();
}
} catch (InterruptedException e) {
exec.shutdownNow();
}
if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
threadGroup.destroy();
}
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.utils;
import java.util.concurrent.Callable;
/**
* A task thread to run by {@link BackgroundService}.
*/
public interface BackgroundTask<T> extends Callable<T> {
int getPriority();
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.utils;
import java.util.PriorityQueue;
/**
* A priority queue that stores a number of {@link BackgroundTask}.
*/
public class BackgroundTaskQueue {
private final PriorityQueue<BackgroundTask> tasks;
public BackgroundTaskQueue() {
tasks = new PriorityQueue<>((task1, task2)
-> task1.getPriority() - task2.getPriority());
}
/**
* @return the head task in this queue.
*/
public synchronized BackgroundTask poll() {
return tasks.poll();
}
/**
* Add a {@link BackgroundTask} to the queue,
* the task will be sorted by its priority.
*
* @param task
*/
public synchronized void add(BackgroundTask task) {
tasks.add(task);
}
/**
* @return true if the queue contains no task, false otherwise.
*/
public synchronized boolean isEmpty() {
return tasks.isEmpty();
}
/**
* @return the size of the queue.
*/
public synchronized int size() {
return tasks.size();
}
}

View File

@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.utils;
/**
* Result of a {@link BackgroundTask}.
*/
public interface BackgroundTaskResult {
/**
* Returns the size of entries included in this result.
*/
int getSize();
}

View File

@ -359,6 +359,37 @@
</description>
</property>
<property>
<name>ozone.block.deleting.service.interval.ms</name>
<value>60000</value>
<description>
Time interval in milliseconds of the block deleting service.
The block deleting service runs on each datanode to scan staled
blocks and delete them asynchronously.
</description>
</property>
<property>
<name>ozone.block.deleting.limit.per.task</name>
<value>1000</value>
<description>
Maximum number of blocks to be deleted by block deleting service
per time interval. This property is used to throttle the actual number
of block deletions on a datanode per container.
</description>
</property>
<property>
<name>ozone.block.deleting.container.limit.per.interval</name>
<value>10</value>
<description>
Maximum number of containers to be scanned by block deleting service
per time interval. The block deleting service spawns a thread to handle
block deletions in a container. This property is used to throttle
the number of threads spawned for block deletions.
</description>
</property>
<property>
<name>dfs.container.ipc</name>
<value>50011</value>

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.TestUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A test class implementation for {@link BlockDeletingService}.
*/
public class BlockDeletingServiceTestImpl
extends BlockDeletingService {
// tests only
private CountDownLatch latch;
private Thread testingThread;
private AtomicInteger numOfProcessed = new AtomicInteger(0);
public BlockDeletingServiceTestImpl(ContainerManager containerManager,
int serviceInterval, Configuration conf) {
super(containerManager, serviceInterval, conf);
}
@VisibleForTesting
public void runDeletingTasks() {
if (latch.getCount() > 0) {
this.latch.countDown();
} else {
throw new IllegalStateException("Count already reaches zero");
}
}
@VisibleForTesting
public boolean isStarted() {
return latch != null && testingThread.isAlive();
}
public int getTimesOfProcessed() {
return numOfProcessed.get();
}
// Override the implementation to start a single on-call control thread.
@Override public void start() {
PeriodicalTask svc = new PeriodicalTask();
// In test mode, relies on a latch countdown to runDeletingTasks tasks.
Runnable r = () -> {
while (true) {
latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException e) {
break;
}
Future<?> future = this.getExecutorService().submit(svc);
try {
// for tests, we only wait for 3s for completion
future.get(3, TimeUnit.SECONDS);
numOfProcessed.incrementAndGet();
} catch (Exception e) {
return;
}
}
};
testingThread = new ThreadFactoryBuilder()
.setDaemon(true)
.build()
.newThread(r);
testingThread.start();
}
@Override
public void shutdown() {
testingThread.interrupt();
super.shutdown();
}
}

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.TestUtils.BlockDeletingServiceTestImpl;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.MetadataKeyFilters;
import org.apache.hadoop.utils.MetadataStore;
import org.junit.Assert;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
import static org.apache.hadoop.ozone.container
.ContainerTestHelper.createSingleNodePipeline;
/**
* Tests to test block deleting service.
*/
public class TestBlockDeletingService {
private static final Logger LOG =
LoggerFactory.getLogger(TestBlockDeletingService.class);
private static File testRoot;
private static File containersDir;
private static File chunksDir;
@BeforeClass
public static void init() {
testRoot = GenericTestUtils
.getTestDir(TestBlockDeletingService.class.getSimpleName());
chunksDir = new File(testRoot, "chunks");
containersDir = new File(testRoot, "containers");
}
@Before
public void setup() throws IOException {
if (chunksDir.exists()) {
FileUtils.deleteDirectory(chunksDir);
}
}
@After
public void cleanup() throws IOException {
FileUtils.deleteDirectory(chunksDir);
FileUtils.deleteDirectory(containersDir);
FileUtils.deleteDirectory(testRoot);
}
private ContainerManager createContainerManager(Configuration conf)
throws Exception {
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
containersDir.getAbsolutePath());
if (containersDir.exists()) {
FileUtils.deleteDirectory(containersDir);
}
ContainerManager containerManager = new ContainerManagerImpl();
List<StorageLocation> pathLists = new LinkedList<>();
pathLists.add(StorageLocation.parse(containersDir.getAbsolutePath()));
containerManager.init(conf, pathLists);
return containerManager;
}
/**
* A helper method to create some blocks and put them under deletion
* state for testing. This method directly updates container.db and
* creates some fake chunk files for testing.
*/
private void createToDeleteBlocks(ContainerManager mgr,
Configuration conf, int numOfContainers, int numOfBlocksPerContainer,
int numOfChunksPerBlock, File chunkDir) throws IOException {
for (int x = 0; x < numOfContainers; x++) {
String containerName = OzoneUtils.getRequestID();
ContainerData data = new ContainerData(containerName);
mgr.createContainer(createSingleNodePipeline(containerName), data);
data = mgr.readContainer(containerName);
MetadataStore metadata = KeyUtils.getDB(data, conf);
for (int j = 0; j<numOfBlocksPerContainer; j++) {
String blockName = containerName + "b" + j;
String deleteStateName = OzoneConsts.DELETING_KEY_PREFIX + blockName;
KeyData kd = new KeyData(containerName, deleteStateName);
List<ContainerProtos.ChunkInfo> chunks = Lists.newArrayList();
for (int k = 0; k<numOfChunksPerBlock; k++) {
// offset doesn't matter here
String chunkName = blockName + "_chunk_" + k;
File chunk = new File(chunkDir, chunkName);
FileUtils.writeStringToFile(chunk, "a chunk");
LOG.info("Creating file {}", chunk.getAbsolutePath());
// make sure file exists
Assert.assertTrue(chunk.isFile() && chunk.exists());
ContainerProtos.ChunkInfo info =
ContainerProtos.ChunkInfo.newBuilder()
.setChunkName(chunk.getAbsolutePath())
.setLen(0)
.setOffset(0)
.setChecksum("")
.build();
chunks.add(info);
}
kd.setChunks(chunks);
metadata.put(DFSUtil.string2Bytes(deleteStateName),
kd.getProtoBufMessage().toByteArray());
}
}
}
/**
* Run service runDeletingTasks and wait for it's been processed.
*/
private void deleteAndWait(BlockDeletingServiceTestImpl service,
int timesOfProcessed) throws TimeoutException, InterruptedException {
service.runDeletingTasks();
GenericTestUtils.waitFor(()
-> service.getTimesOfProcessed() == timesOfProcessed, 100, 3000);
}
/**
* Get under deletion blocks count from DB,
* note this info is parsed from container.db.
*/
private int getUnderDeletionBlocksCount(MetadataStore meta)
throws IOException {
List<Map.Entry<byte[], byte[]>> underDeletionBlocks =
meta.getRangeKVs(null, 100, new MetadataKeyFilters.KeyPrefixFilter(
OzoneConsts.DELETING_KEY_PREFIX));
return underDeletionBlocks.size();
}
@Test
public void testBlockDeletion() throws Exception {
Configuration conf = new OzoneConfiguration();
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
ContainerManager containerManager = createContainerManager(conf);
createToDeleteBlocks(containerManager, conf, 1, 3, 1, chunksDir);
BlockDeletingServiceTestImpl svc =
new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
svc.start();
GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000);
// Ensure 1 container was created
List<ContainerData> containerData = Lists.newArrayList();
containerManager.listContainer(null, 1, "", containerData);
Assert.assertEquals(1, containerData.size());
MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf);
// Ensure there is 100 blocks under deletion
Assert.assertEquals(3, getUnderDeletionBlocksCount(meta));
// An interval will delete 1 * 2 blocks
deleteAndWait(svc, 1);
Assert.assertEquals(1, getUnderDeletionBlocksCount(meta));
deleteAndWait(svc, 2);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
deleteAndWait(svc, 3);
Assert.assertEquals(0, getUnderDeletionBlocksCount(meta));
svc.shutdown();
}
@Test
public void testShutdownService() throws Exception {
Configuration conf = new OzoneConfiguration();
conf.setInt(OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 500);
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 10);
ContainerManager containerManager = createContainerManager(conf);
// Create 1 container with 100 blocks
createToDeleteBlocks(containerManager, conf, 1, 100, 1, chunksDir);
BlockDeletingServiceTestImpl service =
new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
service.start();
GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
// Run some deleting tasks and verify there are threads running
service.runDeletingTasks();
GenericTestUtils.waitFor(() -> service.getThreadCount() > 0, 100, 1000);
// Wait for 1 or 2 intervals
Thread.sleep(1000);
// Shutdown service and verify all threads are stopped
service.shutdown();
GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
}
@Test(timeout = 30000)
public void testContainerThrottle() throws Exception {
// Properties :
// - Number of containers : 2
// - Number of blocks per container : 1
// - Number of chunks per block : 10
// - Container limit per interval : 1
// - Block limit per container : 1
//
// Each time only 1 container can be processed, so each time
// 1 block from 1 container can be deleted.
Configuration conf = new OzoneConfiguration();
// Process 1 container per interval
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 1);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 1);
ContainerManager containerManager = createContainerManager(conf);
createToDeleteBlocks(containerManager, conf, 2, 1, 10, chunksDir);
BlockDeletingServiceTestImpl service =
new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
service.start();
try {
GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
// 1st interval processes 1 container 1 block and 10 chunks
deleteAndWait(service, 1);
Assert.assertEquals(10, chunksDir.listFiles().length);
} finally {
service.shutdown();
}
}
@Test(timeout = 30000)
public void testBlockThrottle() throws Exception {
// Properties :
// - Number of containers : 5
// - Number of blocks per container : 3
// - Number of chunks per block : 1
// - Container limit per interval : 10
// - Block limit per container : 2
//
// Each time containers can be all scanned, but only 2 blocks
// per container can be actually deleted. So it requires 2 waves
// to cleanup all blocks.
Configuration conf = new OzoneConfiguration();
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
ContainerManager containerManager = createContainerManager(conf);
createToDeleteBlocks(containerManager, conf, 5, 3, 1, chunksDir);
// Make sure chunks are created
Assert.assertEquals(15, chunksDir.listFiles().length);
BlockDeletingServiceTestImpl service =
new BlockDeletingServiceTestImpl(containerManager, 1000, conf);
service.start();
try {
GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000);
// Total blocks = 3 * 5 = 15
// block per task = 2
// number of containers = 5
// each interval will at most runDeletingTasks 5 * 2 = 10 blocks
deleteAndWait(service, 1);
Assert.assertEquals(5, chunksDir.listFiles().length);
// There is only 5 blocks left to runDeletingTasks
deleteAndWait(service, 2);
Assert.assertEquals(0, chunksDir.listFiles().length);
} finally {
service.shutdown();
}
}
}