HDFS-12327. Ozone: support setting timeout in background service. Contributed by Yiqun Lin.
This commit is contained in:
parent
38bc1d1097
commit
9e733215a8
|
@ -118,6 +118,11 @@ public final class OzoneConfigKeys {
|
||||||
public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
|
public static final int OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT
|
||||||
= 60000;
|
= 60000;
|
||||||
|
|
||||||
|
public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT =
|
||||||
|
"ozone.block.deleting.service.timeout";
|
||||||
|
public static final int OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
|
||||||
|
= 300000; // 300s for default
|
||||||
|
|
||||||
public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
|
public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
|
||||||
"ozone.block.deleting.limit.per.task";
|
"ozone.block.deleting.limit.per.task";
|
||||||
public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT
|
public static final int OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT
|
||||||
|
|
|
@ -79,9 +79,10 @@ public class BlockDeletingService extends BackgroundService{
|
||||||
private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
|
private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10;
|
||||||
|
|
||||||
public BlockDeletingService(ContainerManager containerManager,
|
public BlockDeletingService(ContainerManager containerManager,
|
||||||
int serviceInterval, Configuration conf) {
|
int serviceInterval, long serviceTimeout, Configuration conf) {
|
||||||
super("BlockDeletingService", serviceInterval,
|
super("BlockDeletingService", serviceInterval,
|
||||||
TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE);
|
TimeUnit.MILLISECONDS, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE,
|
||||||
|
serviceTimeout);
|
||||||
this.containerManager = containerManager;
|
this.containerManager = containerManager;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.blockLimitPerTask = conf.getInt(
|
this.blockLimitPerTask = conf.getInt(
|
||||||
|
|
|
@ -46,6 +46,7 @@ import java.io.IOException;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
|
||||||
|
@ -54,6 +55,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
|
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
|
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ozone main class sets up the network server and initializes the container
|
* Ozone main class sets up the network server and initializes the container
|
||||||
|
@ -103,8 +106,11 @@ public class OzoneContainer {
|
||||||
int svcInterval = ozoneConfig.getInt(
|
int svcInterval = ozoneConfig.getInt(
|
||||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
|
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
|
||||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
|
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
|
||||||
|
long serviceTimeout = ozoneConfig.getTimeDuration(
|
||||||
|
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
||||||
|
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||||
this.blockDeletingService = new BlockDeletingService(manager,
|
this.blockDeletingService = new BlockDeletingService(manager,
|
||||||
svcInterval, ozoneConfig);
|
svcInterval, serviceTimeout, ozoneConfig);
|
||||||
|
|
||||||
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
|
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -42,7 +43,8 @@ import java.util.concurrent.ExecutionException;
|
||||||
*/
|
*/
|
||||||
public abstract class BackgroundService {
|
public abstract class BackgroundService {
|
||||||
|
|
||||||
private static final Logger LOG =
|
@VisibleForTesting
|
||||||
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(BackgroundService.class);
|
LoggerFactory.getLogger(BackgroundService.class);
|
||||||
|
|
||||||
// Executor to launch child tasks
|
// Executor to launch child tasks
|
||||||
|
@ -51,13 +53,15 @@ public abstract class BackgroundService {
|
||||||
private final ThreadFactory threadFactory;
|
private final ThreadFactory threadFactory;
|
||||||
private final String serviceName;
|
private final String serviceName;
|
||||||
private final int interval;
|
private final int interval;
|
||||||
|
private final long serviceTimeout;
|
||||||
private final TimeUnit unit;
|
private final TimeUnit unit;
|
||||||
|
|
||||||
public BackgroundService(String serviceName, int interval,
|
public BackgroundService(String serviceName, int interval,
|
||||||
TimeUnit unit, int threadPoolSize) {
|
TimeUnit unit, int threadPoolSize, long serviceTimeout) {
|
||||||
this.interval = interval;
|
this.interval = interval;
|
||||||
this.unit = unit;
|
this.unit = unit;
|
||||||
this.serviceName = serviceName;
|
this.serviceName = serviceName;
|
||||||
|
this.serviceTimeout = serviceTimeout;
|
||||||
threadGroup = new ThreadGroup(serviceName);
|
threadGroup = new ThreadGroup(serviceName);
|
||||||
ThreadFactory tf = r -> new Thread(threadGroup, r);
|
ThreadFactory tf = r -> new Thread(threadGroup, r);
|
||||||
threadFactory = new ThreadFactoryBuilder()
|
threadFactory = new ThreadFactoryBuilder()
|
||||||
|
@ -115,8 +119,9 @@ public abstract class BackgroundService {
|
||||||
results.parallelStream().forEach(taskResultFuture -> {
|
results.parallelStream().forEach(taskResultFuture -> {
|
||||||
try {
|
try {
|
||||||
// Collect task results
|
// Collect task results
|
||||||
// TODO timeout in case task hangs
|
BackgroundTaskResult result = serviceTimeout > 0
|
||||||
BackgroundTaskResult result = taskResultFuture.get();
|
? taskResultFuture.get(serviceTimeout, TimeUnit.MILLISECONDS)
|
||||||
|
: taskResultFuture.get();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("task execution result size {}", result.getSize());
|
LOG.debug("task execution result size {}", result.getSize());
|
||||||
}
|
}
|
||||||
|
@ -124,6 +129,9 @@ public abstract class BackgroundService {
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
"Background task fails to execute, "
|
"Background task fails to execute, "
|
||||||
+ "retrying in next interval", e);
|
+ "retrying in next interval", e);
|
||||||
|
} catch (TimeoutException e) {
|
||||||
|
LOG.warn("Background task executes timed out, "
|
||||||
|
+ "retrying in next interval", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -369,6 +369,19 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>ozone.block.deleting.service.timeout</name>
|
||||||
|
<value>300000ms</value>
|
||||||
|
<description>
|
||||||
|
Timeout value of block deletion service. If this is set greater than 0,
|
||||||
|
the service will stop waiting for the block deleting completion after this
|
||||||
|
time. If timeout happens to a large proportion of block deletion, this needs
|
||||||
|
to be increased with ozone.block.deleting.limit.per.task. This setting supports
|
||||||
|
multiple time unit suffixes as described in dfs.heartbeat.interval. If no suffix
|
||||||
|
is specified then milliseconds is assumed.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>ozone.block.deleting.limit.per.task</name>
|
<name>ozone.block.deleting.limit.per.task</name>
|
||||||
<value>1000</value>
|
<value>1000</value>
|
||||||
|
|
|
@ -33,6 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
public class BlockDeletingServiceTestImpl
|
public class BlockDeletingServiceTestImpl
|
||||||
extends BlockDeletingService {
|
extends BlockDeletingService {
|
||||||
|
|
||||||
|
// the service timeout
|
||||||
|
private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0;
|
||||||
|
|
||||||
// tests only
|
// tests only
|
||||||
private CountDownLatch latch;
|
private CountDownLatch latch;
|
||||||
private Thread testingThread;
|
private Thread testingThread;
|
||||||
|
@ -40,7 +43,8 @@ public class BlockDeletingServiceTestImpl
|
||||||
|
|
||||||
public BlockDeletingServiceTestImpl(ContainerManager containerManager,
|
public BlockDeletingServiceTestImpl(ContainerManager containerManager,
|
||||||
int serviceInterval, Configuration conf) {
|
int serviceInterval, Configuration conf) {
|
||||||
super(containerManager, serviceInterval, conf);
|
super(containerManager, serviceInterval,
|
||||||
|
SERVICE_TIMEOUT_IN_MILLISECONDS, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -32,8 +32,11 @@ import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
|
||||||
|
import org.apache.hadoop.utils.BackgroundService;
|
||||||
import org.apache.hadoop.utils.MetadataKeyFilters;
|
import org.apache.hadoop.utils.MetadataKeyFilters;
|
||||||
import org.apache.hadoop.utils.MetadataStore;
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -235,6 +238,64 @@ public class TestBlockDeletingService {
|
||||||
GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
|
GenericTestUtils.waitFor(() -> service.getThreadCount() == 0, 100, 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBlockDeletionTimeout() throws Exception {
|
||||||
|
Configuration conf = new OzoneConfiguration();
|
||||||
|
conf.setInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, 10);
|
||||||
|
conf.setInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, 2);
|
||||||
|
ContainerManager containerManager = createContainerManager(conf);
|
||||||
|
createToDeleteBlocks(containerManager, conf, 1, 3, 1, chunksDir);
|
||||||
|
|
||||||
|
// set timeout value as 1ms to trigger timeout behavior
|
||||||
|
long timeout = 1;
|
||||||
|
BlockDeletingService svc =
|
||||||
|
new BlockDeletingService(containerManager, 1000, timeout, conf);
|
||||||
|
svc.start();
|
||||||
|
|
||||||
|
LogCapturer log = LogCapturer.captureLogs(BackgroundService.LOG);
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
if(log.getOutput().contains(
|
||||||
|
"Background task executes timed out, retrying in next interval")) {
|
||||||
|
log.stopCapturing();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}, 1000, 100000);
|
||||||
|
|
||||||
|
log.stopCapturing();
|
||||||
|
svc.shutdown();
|
||||||
|
|
||||||
|
// test for normal case that doesn't have timeout limitation
|
||||||
|
timeout = 0;
|
||||||
|
createToDeleteBlocks(containerManager, conf, 1, 3, 1, chunksDir);
|
||||||
|
svc = new BlockDeletingService(containerManager, 1000, timeout, conf);
|
||||||
|
svc.start();
|
||||||
|
|
||||||
|
// get container meta data
|
||||||
|
List<ContainerData> containerData = Lists.newArrayList();
|
||||||
|
containerManager.listContainer(null, 1, "", containerData);
|
||||||
|
MetadataStore meta = KeyUtils.getDB(containerData.get(0), conf);
|
||||||
|
|
||||||
|
LogCapturer newLog = LogCapturer.captureLogs(BackgroundService.LOG);
|
||||||
|
GenericTestUtils.waitFor(() -> {
|
||||||
|
try {
|
||||||
|
if (getUnderDeletionBlocksCount(meta) == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
} catch (IOException ignored) {
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}, 1000, 100000);
|
||||||
|
newLog.stopCapturing();
|
||||||
|
|
||||||
|
// The block deleting successfully and shouldn't catch timed
|
||||||
|
// out warning log.
|
||||||
|
Assert.assertTrue(!newLog.getOutput().contains(
|
||||||
|
"Background task executes timed out, retrying in next interval"));
|
||||||
|
svc.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testContainerThrottle() throws Exception {
|
public void testContainerThrottle() throws Exception {
|
||||||
// Properties :
|
// Properties :
|
||||||
|
|
Loading…
Reference in New Issue