HDFS-11585. Ozone: Support force update a container. Contributed by Yuanbo Liu.
This commit is contained in:
parent
f8110c3e6e
commit
185e3ad744
|
@ -232,6 +232,7 @@ message ReadContainerResponseProto {
|
|||
message UpdateContainerRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required ContainerData containerData = 2;
|
||||
optional bool forceUpdate = 3 [default = false];
|
||||
}
|
||||
|
||||
message UpdateContainerResponseProto {
|
||||
|
|
|
@ -515,7 +515,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
|
||||
@Override
|
||||
public void updateContainer(Pipeline pipeline, String containerName,
|
||||
ContainerData data) throws StorageContainerException{
|
||||
ContainerData data, boolean forceUpdate)
|
||||
throws StorageContainerException {
|
||||
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
|
||||
Preconditions.checkNotNull(containerName, "Container name cannot be null");
|
||||
Preconditions.checkNotNull(data, "Container data cannot be null");
|
||||
|
@ -541,32 +542,36 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
try {
|
||||
Path location = locationManager.getContainerPath();
|
||||
ContainerData orgData = containerMap.get(containerName).getContainer();
|
||||
if (!orgData.isOpen()) {
|
||||
if (!forceUpdate && !orgData.isOpen()) {
|
||||
throw new StorageContainerException(
|
||||
"Update a closed container is not allowed. Name: " + containerName,
|
||||
UNSUPPORTED_REQUEST);
|
||||
}
|
||||
|
||||
containerFile = ContainerUtils.getContainerFile(orgData, location);
|
||||
if (!containerFile.exists() || !containerFile.canWrite()) {
|
||||
throw new StorageContainerException(
|
||||
"Container file not exists or corrupted. Name: " + containerName,
|
||||
CONTAINER_INTERNAL_ERROR);
|
||||
// If forceUpdate is true, there is no need to check
|
||||
// whether the container file exists.
|
||||
if (!forceUpdate) {
|
||||
if (!containerFile.exists() || !containerFile.canWrite()) {
|
||||
throw new StorageContainerException(
|
||||
"Container file not exists or corrupted. Name: " + containerName,
|
||||
CONTAINER_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
// Backup the container file
|
||||
containerFileBK = File.createTempFile(
|
||||
"tmp_" + System.currentTimeMillis() + "_",
|
||||
containerFile.getName(), containerFile.getParentFile());
|
||||
FileUtils.copyFile(containerFile, containerFileBK);
|
||||
|
||||
deleted = containerFile.delete();
|
||||
containerStream = new FileOutputStream(containerFile);
|
||||
dos = new DigestOutputStream(containerStream, sha);
|
||||
|
||||
ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
|
||||
protoData.writeDelimitedTo(dos);
|
||||
}
|
||||
|
||||
// Backup the container file
|
||||
containerFileBK = File.createTempFile(
|
||||
"tmp_" + System.currentTimeMillis() + "_",
|
||||
containerFile.getName(), containerFile.getParentFile());
|
||||
FileUtils.copyFile(containerFile, containerFileBK);
|
||||
|
||||
deleted = containerFile.delete();
|
||||
containerStream = new FileOutputStream(containerFile);
|
||||
dos = new DigestOutputStream(containerStream, sha);
|
||||
|
||||
ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
|
||||
protoData.writeDelimitedTo(dos);
|
||||
|
||||
// Update the in-memory map
|
||||
ContainerStatus newStatus = new ContainerStatus(data, true);
|
||||
containerMap.replace(containerName, newStatus);
|
||||
|
@ -583,6 +588,9 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
"Failed to restore container data from the backup. Name: "
|
||||
+ containerName, CONTAINER_INTERNAL_ERROR);
|
||||
}
|
||||
} else {
|
||||
throw new StorageContainerException(
|
||||
e.getMessage(), CONTAINER_INTERNAL_ERROR);
|
||||
}
|
||||
} finally {
|
||||
if (containerFileBK != null && containerFileBK.exists()) {
|
||||
|
|
|
@ -319,7 +319,9 @@ public class Dispatcher implements ContainerDispatcher {
|
|||
|
||||
ContainerData data = ContainerData.getFromProtBuf(
|
||||
msg.getUpdateContainer().getContainerData());
|
||||
this.containerManager.updateContainer(pipeline, containerName, data);
|
||||
boolean forceUpdate = msg.getUpdateContainer().getForceUpdate();
|
||||
this.containerManager.updateContainer(
|
||||
pipeline, containerName, data, forceUpdate);
|
||||
return ContainerUtils.getContainerResponse(msg);
|
||||
}
|
||||
|
||||
|
|
|
@ -76,10 +76,11 @@ public interface ContainerManager extends RwLock {
|
|||
* @param pipeline container nodes
|
||||
* @param containerName name of the container
|
||||
* @param data container data
|
||||
* @param forceUpdate if true, update container forcibly.
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
void updateContainer(Pipeline pipeline, String containerName,
|
||||
ContainerData data) throws StorageContainerException;
|
||||
ContainerData data, boolean forceUpdate) throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* As simple interface for container Iterations.
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.impl;
|
|||
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
|
@ -654,7 +655,7 @@ public class TestContainerPersistence {
|
|||
String containerName = OzoneUtils.getRequestID();
|
||||
ContainerData data = new ContainerData(containerName);
|
||||
data.addMetadata("VOLUME", "shire");
|
||||
data.addMetadata("owner)", "bilbo");
|
||||
data.addMetadata("owner", "bilbo");
|
||||
|
||||
containerManager.createContainer(
|
||||
createSingleNodePipeline(containerName),
|
||||
|
@ -665,12 +666,12 @@ public class TestContainerPersistence {
|
|||
|
||||
ContainerData newData = new ContainerData(containerName);
|
||||
newData.addMetadata("VOLUME", "shire_new");
|
||||
newData.addMetadata("owner)", "bilbo_new");
|
||||
newData.addMetadata("owner", "bilbo_new");
|
||||
|
||||
containerManager.updateContainer(
|
||||
createSingleNodePipeline(containerName),
|
||||
containerName,
|
||||
newData);
|
||||
newData, false);
|
||||
|
||||
Assert.assertEquals(1, containerManager.getContainerMap().size());
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
|
@ -681,7 +682,7 @@ public class TestContainerPersistence {
|
|||
.get(containerName).getContainer();
|
||||
Assert.assertEquals(actualNewData.getAllMetadata().get("VOLUME"),
|
||||
"shire_new");
|
||||
Assert.assertEquals(actualNewData.getAllMetadata().get("owner)"),
|
||||
Assert.assertEquals(actualNewData.getAllMetadata().get("owner"),
|
||||
"bilbo_new");
|
||||
|
||||
// Verify container data on disk
|
||||
|
@ -699,16 +700,42 @@ public class TestContainerPersistence {
|
|||
.getFromProtBuf(actualContainerDataProto);
|
||||
Assert.assertEquals(actualContainerData.getAllMetadata().get("VOLUME"),
|
||||
"shire_new");
|
||||
Assert.assertEquals(actualContainerData.getAllMetadata().get("owner)"),
|
||||
Assert.assertEquals(actualContainerData.getAllMetadata().get("owner"),
|
||||
"bilbo_new");
|
||||
}
|
||||
|
||||
// Test force update flag.
|
||||
// Delete container file then try to update without force update flag.
|
||||
FileUtil.fullyDelete(newContainerFile);
|
||||
try {
|
||||
containerManager.updateContainer(createSingleNodePipeline(containerName),
|
||||
containerName, newData, false);
|
||||
} catch (StorageContainerException ex) {
|
||||
Assert.assertEquals("Container file not exists or "
|
||||
+ "corrupted. Name: " + containerName, ex.getMessage());
|
||||
}
|
||||
|
||||
// Update with force flag, it should be success.
|
||||
newData = new ContainerData(containerName);
|
||||
newData.addMetadata("VOLUME", "shire_new_1");
|
||||
newData.addMetadata("owner", "bilbo_new_1");
|
||||
containerManager.updateContainer(createSingleNodePipeline(containerName),
|
||||
containerName, newData, true);
|
||||
|
||||
// Verify in-memory map
|
||||
actualNewData = containerManager.getContainerMap()
|
||||
.get(containerName).getContainer();
|
||||
Assert.assertEquals(actualNewData.getAllMetadata().get("VOLUME"),
|
||||
"shire_new_1");
|
||||
Assert.assertEquals(actualNewData.getAllMetadata().get("owner"),
|
||||
"bilbo_new_1");
|
||||
|
||||
// Update a non-existing container
|
||||
exception.expect(StorageContainerException.class);
|
||||
exception.expectMessage("Container doesn't exist.");
|
||||
containerManager.updateContainer(
|
||||
createSingleNodePipeline("non_exist_container"),
|
||||
"non_exist_container", newData);
|
||||
"non_exist_container", newData, false);
|
||||
}
|
||||
|
||||
private KeyData writeKeyHelper(Pipeline pipeline,
|
||||
|
|
Loading…
Reference in New Issue