HDFS-12680. Ozone: SCM: Lease support for container creation. Contributed by Nandakumar.
This commit is contained in:
parent
338c909ac6
commit
1b56a73a4f
|
@ -136,10 +136,6 @@ public class ChunkGroupOutputStream extends OutputStream {
|
|||
// create container if needed
|
||||
if (subKeyInfo.getShouldCreateContainer()) {
|
||||
try {
|
||||
scmClient.notifyObjectCreationStage(
|
||||
NotifyObjectCreationStageRequestProto.Type.container,
|
||||
containerName,
|
||||
NotifyObjectCreationStageRequestProto.Stage.begin);
|
||||
ContainerProtocolCalls.createContainer(xceiverClient, requestID);
|
||||
scmClient.notifyObjectCreationStage(
|
||||
NotifyObjectCreationStageRequestProto.Type.container,
|
||||
|
|
|
@ -191,6 +191,12 @@ public final class ScmConfigKeys {
|
|||
public static final String OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY =
|
||||
"ozone.scm.container.deletion-choosing.policy";
|
||||
|
||||
public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT =
|
||||
"ozone.scm.container.creation.lease.timeout";
|
||||
|
||||
public static final long OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT =
|
||||
60000;
|
||||
|
||||
/**
|
||||
* Don't start processing a pool if we have not had a minimum number of
|
||||
* seconds from the last processing.
|
||||
|
|
|
@ -161,7 +161,6 @@ public class ContainerInfo {
|
|||
@Override
|
||||
public int hashCode() {
|
||||
return new HashCodeBuilder(11, 811)
|
||||
.append(state)
|
||||
.append(pipeline.getContainerName())
|
||||
.append(pipeline.getFactor())
|
||||
.append(pipeline.getType())
|
||||
|
|
|
@ -52,7 +52,7 @@ public class LeaseManager<T> {
|
|||
* Creates an instance of lease manager.
|
||||
*
|
||||
* @param defaultTimeout
|
||||
* Default timeout value to be used for lease creation.
|
||||
* Default timeout in milliseconds to be used for lease creation.
|
||||
*/
|
||||
public LeaseManager(long defaultTimeout) {
|
||||
this.defaultTimeout = defaultTimeout;
|
||||
|
|
|
@ -270,6 +270,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
size, owner, type, factor, OzoneProtos.LifeCycleState
|
||||
.ALLOCATED);
|
||||
if (containerInfo != null) {
|
||||
containerManager.updateContainerState(containerInfo.getContainerName(),
|
||||
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
|
||||
return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED);
|
||||
}
|
||||
|
||||
|
@ -298,6 +300,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
size, owner, type, factor, OzoneProtos.LifeCycleState
|
||||
.ALLOCATED);
|
||||
if (containerInfo != null) {
|
||||
containerManager.updateContainerState(containerInfo.getContainerName(),
|
||||
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
|
||||
return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,9 @@ import com.google.common.base.Preconditions;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.lease.Lease;
|
||||
import org.apache.hadoop.ozone.lease.LeaseException;
|
||||
import org.apache.hadoop.ozone.lease.LeaseManager;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
|
||||
|
@ -28,6 +31,7 @@ import org.apache.hadoop.ozone.scm.exceptions.SCMException;
|
|||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
||||
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
|
@ -65,6 +69,7 @@ public class ContainerMapping implements Mapping {
|
|||
private final MetadataStore containerStore;
|
||||
private final PipelineSelector pipelineSelector;
|
||||
private final ContainerStateManager containerStateManager;
|
||||
private final LeaseManager<ContainerInfo> containerLeaseManager;
|
||||
|
||||
/**
|
||||
* Constructs a mapping class that creates mapping between container names
|
||||
|
@ -105,6 +110,13 @@ public class ContainerMapping implements Mapping {
|
|||
this.containerStateManager = new ContainerStateManager(conf, +this
|
||||
.cacheSize * OzoneConsts.MB);
|
||||
LOG.trace("Container State Manager created.");
|
||||
|
||||
long containerCreationLeaseTimeout = conf.getLong(
|
||||
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
|
||||
ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT);
|
||||
LOG.trace("Starting Container Lease Manager.");
|
||||
containerLeaseManager = new LeaseManager<>(containerCreationLeaseTimeout);
|
||||
containerLeaseManager.start();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -278,6 +290,21 @@ public class ContainerMapping implements Mapping {
|
|||
|
||||
Preconditions.checkNotNull(containerInfo);
|
||||
|
||||
if (event == OzoneProtos.LifeCycleEvent.BEGIN_CREATE) {
|
||||
// Acquire lease on container
|
||||
Lease<ContainerInfo> containerLease =
|
||||
containerLeaseManager.acquire(containerInfo);
|
||||
// Register callback to be executed in case of timeout
|
||||
containerLease.registerCallBack(() -> {
|
||||
containerStateManager.updateContainerState(
|
||||
new BlockContainerInfo(containerInfo, 0),
|
||||
OzoneProtos.LifeCycleEvent.TIMEOUT);
|
||||
return null;
|
||||
});
|
||||
} else if (event == OzoneProtos.LifeCycleEvent.COMPLETE_CREATE) {
|
||||
// Release the lease on container
|
||||
containerLeaseManager.release(containerInfo);
|
||||
}
|
||||
// TODO: Actual used will be updated via Container Reports later.
|
||||
containerInfo.setState(
|
||||
containerStateManager.updateContainerState(
|
||||
|
@ -285,6 +312,8 @@ public class ContainerMapping implements Mapping {
|
|||
|
||||
containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
|
||||
return containerInfo.getState();
|
||||
} catch (LeaseException e) {
|
||||
throw new IOException("Lease Exception.", e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -315,6 +344,9 @@ public class ContainerMapping implements Mapping {
|
|||
*/
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (containerLeaseManager != null) {
|
||||
containerLeaseManager.shutdown();
|
||||
}
|
||||
if (containerStore != null) {
|
||||
containerStore.close();
|
||||
}
|
||||
|
|
|
@ -1071,6 +1071,19 @@
|
|||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.scm.container.creation.lease.timeout</name>
|
||||
<value>60000</value>
|
||||
<tag>OZONE, SCM</tag>
|
||||
<description>
|
||||
Container creation timeout in milliseconds to be used by SCM. When
|
||||
BEGIN_CREATE event happens the container is moved from ALLOCATED to
|
||||
CREATING state, SCM will now wait for the configured amount of time
|
||||
to get COMPLETE_CREATE event if it doesn't receive it will move the
|
||||
container to DELETING.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.key.preallocation.maxsize</name>
|
||||
<value>134217728</value>
|
||||
|
|
|
@ -21,7 +21,9 @@ import org.apache.hadoop.fs.FileUtil;
|
|||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
|
||||
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -48,6 +50,8 @@ public class TestContainerMapping {
|
|||
private static File testDir;
|
||||
private static XceiverClientManager xceiverClientManager;
|
||||
|
||||
private static final long TIMEOUT = 10000;
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
@BeforeClass
|
||||
|
@ -58,6 +62,8 @@ public class TestContainerMapping {
|
|||
.getTestDir(TestContainerMapping.class.getSimpleName());
|
||||
conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS,
|
||||
testDir.getAbsolutePath());
|
||||
conf.setLong(ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
|
||||
TIMEOUT);
|
||||
boolean folderExisted = testDir.exists() || testDir.mkdirs();
|
||||
if (!folderExisted) {
|
||||
throw new IOException("Unable to create test directory path");
|
||||
|
@ -155,4 +161,33 @@ public class TestContainerMapping {
|
|||
xceiverClientManager.getFactor(), containerName,
|
||||
OzoneProtos.Owner.OZONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerCreationLeaseTimeout() throws IOException,
|
||||
InterruptedException {
|
||||
String containerName = UUID.randomUUID().toString();
|
||||
nodeManager.setChillmode(false);
|
||||
ContainerInfo containerInfo = mapping.allocateContainer(
|
||||
xceiverClientManager.getType(),
|
||||
xceiverClientManager.getFactor(),
|
||||
containerName,
|
||||
OzoneProtos.Owner.OZONE);
|
||||
mapping.updateContainerState(containerInfo.getContainerName(),
|
||||
OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
|
||||
Thread.sleep(TIMEOUT + 1000);
|
||||
|
||||
BlockContainerInfo deletingContainer = mapping.getStateManager()
|
||||
.getMatchingContainer(
|
||||
0, containerInfo.getOwner(),
|
||||
xceiverClientManager.getType(),
|
||||
xceiverClientManager.getFactor(),
|
||||
OzoneProtos.LifeCycleState.DELETING);
|
||||
Assert.assertEquals(containerInfo.getContainerName(),
|
||||
deletingContainer.getContainerName());
|
||||
|
||||
thrown.expect(IOException.class);
|
||||
thrown.expectMessage("Lease Exception");
|
||||
mapping.updateContainerState(containerInfo.getContainerName(),
|
||||
OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,7 @@ public class TestCorona {
|
|||
args.toArray(new String[0]));
|
||||
Assert.assertEquals(2, corona.getNumberOfVolumesCreated());
|
||||
Assert.assertEquals(10, corona.getNumberOfBucketsCreated());
|
||||
Assert.assertEquals(99, corona.getNumberOfKeysAdded());
|
||||
Assert.assertEquals(100, corona.getNumberOfKeysAdded());
|
||||
Assert.assertEquals(10240 - 36, corona.getKeyValueLength());
|
||||
Assert.assertEquals(0, res);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue