HDFS-11567. Ozone: SCM: Support update container. Contributed by Weiwei Yang.
This commit is contained in:
parent
8617fda2c6
commit
1590b9f7ea
|
@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.impl;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||||
|
@ -75,6 +76,8 @@ import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.Result.NO_SUCH_ALGORITHM;
|
.Result.NO_SUCH_ALGORITHM;
|
||||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
.Result.UNABLE_TO_READ_METADATA_DB;
|
.Result.UNABLE_TO_READ_METADATA_DB;
|
||||||
|
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
|
||||||
|
.Result.UNSUPPORTED_REQUEST;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
|
||||||
|
|
||||||
|
@ -478,6 +481,95 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
containerMap.put(containerName, status);
|
containerMap.put(containerName, status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateContainer(Pipeline pipeline, String containerName,
|
||||||
|
ContainerData data) throws StorageContainerException{
|
||||||
|
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
|
||||||
|
Preconditions.checkNotNull(containerName, "Container name cannot be null");
|
||||||
|
Preconditions.checkNotNull(data, "Container data cannot be null");
|
||||||
|
FileOutputStream containerStream = null;
|
||||||
|
DigestOutputStream dos = null;
|
||||||
|
MessageDigest sha = null;
|
||||||
|
File containerFileBK = null, containerFile = null;
|
||||||
|
boolean deleted = false;
|
||||||
|
|
||||||
|
if(!containerMap.containsKey(containerName)) {
|
||||||
|
throw new StorageContainerException("Container doesn't exist. Name :"
|
||||||
|
+ containerName, CONTAINER_NOT_FOUND);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
|
} catch (NoSuchAlgorithmException e) {
|
||||||
|
throw new StorageContainerException("Unable to create Message Digest,"
|
||||||
|
+ " usually this is a java configuration issue.",
|
||||||
|
NO_SUCH_ALGORITHM);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Path location = locationManager.getContainerPath();
|
||||||
|
ContainerData orgData = containerMap.get(containerName).getContainer();
|
||||||
|
if (!orgData.isOpen()) {
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Update a closed container is not allowed. Name: " + containerName,
|
||||||
|
UNSUPPORTED_REQUEST);
|
||||||
|
}
|
||||||
|
|
||||||
|
containerFile = ContainerUtils.getContainerFile(orgData, location);
|
||||||
|
if (!containerFile.exists() || !containerFile.canWrite()) {
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Container file not exists or corrupted. Name: " + containerName,
|
||||||
|
CONTAINER_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backup the container file
|
||||||
|
containerFileBK = File.createTempFile(
|
||||||
|
"tmp_" + System.currentTimeMillis() + "_",
|
||||||
|
containerFile.getName(), containerFile.getParentFile());
|
||||||
|
FileUtils.copyFile(containerFile, containerFileBK);
|
||||||
|
|
||||||
|
deleted = containerFile.delete();
|
||||||
|
containerStream = new FileOutputStream(containerFile);
|
||||||
|
dos = new DigestOutputStream(containerStream, sha);
|
||||||
|
|
||||||
|
ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
|
||||||
|
protoData.writeDelimitedTo(dos);
|
||||||
|
|
||||||
|
// Update the in-memory map
|
||||||
|
ContainerStatus newStatus = new ContainerStatus(data, true);
|
||||||
|
containerMap.replace(containerName, newStatus);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Restore the container file from backup
|
||||||
|
if(containerFileBK != null && containerFileBK.exists() && deleted) {
|
||||||
|
if(containerFile.delete()
|
||||||
|
&& containerFileBK.renameTo(containerFile)) {
|
||||||
|
throw new StorageContainerException("Container update failed,"
|
||||||
|
+ " container data restored from the backup.",
|
||||||
|
CONTAINER_INTERNAL_ERROR);
|
||||||
|
} else {
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Failed to restore container data from the backup. Name: "
|
||||||
|
+ containerName, CONTAINER_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (containerFileBK != null && containerFileBK.exists()) {
|
||||||
|
if(!containerFileBK.delete()) {
|
||||||
|
LOG.warn("Unable to delete container file backup : {}.",
|
||||||
|
containerFileBK.getAbsolutePath());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
IOUtils.closeStream(dos);
|
||||||
|
IOUtils.closeStream(containerStream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected File getContainerFile(ContainerData data) throws IOException {
|
||||||
|
return ContainerUtils.getContainerFile(data,
|
||||||
|
this.locationManager.getContainerPath());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if a container exists.
|
* Checks if a container exists.
|
||||||
*
|
*
|
||||||
|
|
|
@ -163,8 +163,7 @@ public class Dispatcher implements ContainerDispatcher {
|
||||||
return ContainerUtils.unsupportedRequest(msg);
|
return ContainerUtils.unsupportedRequest(msg);
|
||||||
|
|
||||||
case UpdateContainer:
|
case UpdateContainer:
|
||||||
// TODO : Support Update Container.
|
return handleUpdateContainer(msg);
|
||||||
return ContainerUtils.unsupportedRequest(msg);
|
|
||||||
|
|
||||||
case ReadContainer:
|
case ReadContainer:
|
||||||
return handleReadContainer(msg);
|
return handleReadContainer(msg);
|
||||||
|
@ -297,6 +296,33 @@ public class Dispatcher implements ContainerDispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update an existing container with the new container data.
|
||||||
|
*
|
||||||
|
* @param msg Request
|
||||||
|
* @return ContainerCommandResponseProto
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ContainerCommandResponseProto handleUpdateContainer(
|
||||||
|
ContainerCommandRequestProto msg)
|
||||||
|
throws IOException {
|
||||||
|
if (!msg.hasUpdateContainer()) {
|
||||||
|
LOG.debug("Malformed read container request. trace ID: {}",
|
||||||
|
msg.getTraceID());
|
||||||
|
return ContainerUtils.malformedRequest(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||||
|
msg.getUpdateContainer().getPipeline());
|
||||||
|
String containerName = msg.getUpdateContainer()
|
||||||
|
.getContainerData().getName();
|
||||||
|
|
||||||
|
ContainerData data = ContainerData.getFromProtBuf(
|
||||||
|
msg.getUpdateContainer().getContainerData());
|
||||||
|
this.containerManager.updateContainer(pipeline, containerName, data);
|
||||||
|
return ContainerUtils.getContainerResponse(msg);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls into container logic and returns appropriate response.
|
* Calls into container logic and returns appropriate response.
|
||||||
*
|
*
|
||||||
|
|
|
@ -69,6 +69,17 @@ public interface ContainerManager extends RwLock {
|
||||||
void deleteContainer(Pipeline pipeline, String containerName)
|
void deleteContainer(Pipeline pipeline, String containerName)
|
||||||
throws StorageContainerException;
|
throws StorageContainerException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update an existing container.
|
||||||
|
*
|
||||||
|
* @param pipeline container nodes
|
||||||
|
* @param containerName name of the container
|
||||||
|
* @param data container data
|
||||||
|
* @throws StorageContainerException
|
||||||
|
*/
|
||||||
|
void updateContainer(Pipeline pipeline, String containerName,
|
||||||
|
ContainerData data) throws StorageContainerException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* As simple interface for container Iterations.
|
* As simple interface for container Iterations.
|
||||||
*
|
*
|
||||||
|
|
|
@ -41,6 +41,7 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helpers for container tests.
|
* Helpers for container tests.
|
||||||
|
@ -292,6 +293,43 @@ public final class ContainerTestHelper {
|
||||||
return request.build();
|
return request.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return an update container command for test purposes.
|
||||||
|
* Creates a container data based on the given meta data,
|
||||||
|
* and request to update an existing container with it.
|
||||||
|
*
|
||||||
|
* @param containerName
|
||||||
|
* @param metaData
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static ContainerCommandRequestProto getUpdateContainerRequest(
|
||||||
|
String containerName, Map<String, String> metaData) throws IOException {
|
||||||
|
ContainerProtos.UpdateContainerRequestProto.Builder updateRequestBuilder =
|
||||||
|
ContainerProtos.UpdateContainerRequestProto.newBuilder();
|
||||||
|
ContainerProtos.ContainerData.Builder containerData = ContainerProtos
|
||||||
|
.ContainerData.newBuilder();
|
||||||
|
containerData.setName(containerName);
|
||||||
|
String[] keys = metaData.keySet().toArray(new String[]{});
|
||||||
|
for(int i=0; i<keys.length; i++) {
|
||||||
|
ContainerProtos.KeyValue.Builder kvBuilder =
|
||||||
|
ContainerProtos.KeyValue.newBuilder();
|
||||||
|
kvBuilder.setKey(keys[i]);
|
||||||
|
kvBuilder.setValue(metaData.get(keys[i]));
|
||||||
|
containerData.addMetadata(i, kvBuilder.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
updateRequestBuilder.setPipeline(
|
||||||
|
ContainerTestHelper.createSingleNodePipeline(containerName)
|
||||||
|
.getProtobufMessage());
|
||||||
|
updateRequestBuilder.setContainerData(containerData.build());
|
||||||
|
|
||||||
|
ContainerCommandRequestProto.Builder request =
|
||||||
|
ContainerCommandRequestProto.newBuilder();
|
||||||
|
request.setCmdType(ContainerProtos.Type.UpdateContainer);
|
||||||
|
request.setUpdateContainer(updateRequestBuilder.build());
|
||||||
|
return request.build();
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Returns a create container response for test purposes. There are a bunch of
|
* Returns a create container response for test purposes. There are a bunch of
|
||||||
* tests where we need to just send a request and get a reply.
|
* tests where we need to just send a request and get a reply.
|
||||||
|
|
|
@ -44,6 +44,7 @@ import org.junit.rules.ExpectedException;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.file.DirectoryStream;
|
import java.nio.file.DirectoryStream;
|
||||||
|
@ -613,5 +614,71 @@ public class TestContainerPersistence {
|
||||||
keyManager.deleteKey(pipeline, keyName);
|
keyManager.deleteKey(pipeline, keyName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to update an existing and non-existing container.
|
||||||
|
* Verifies container map and persistent data both updated.
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testUpdateContainer() throws IOException {
|
||||||
|
String containerName = OzoneUtils.getRequestID();
|
||||||
|
ContainerData data = new ContainerData(containerName);
|
||||||
|
data.addMetadata("VOLUME", "shire");
|
||||||
|
data.addMetadata("owner)", "bilbo");
|
||||||
|
|
||||||
|
containerManager.createContainer(
|
||||||
|
createSingleNodePipeline(containerName),
|
||||||
|
data);
|
||||||
|
|
||||||
|
File orgContainerFile = containerManager.getContainerFile(data);
|
||||||
|
Assert.assertTrue(orgContainerFile.exists());
|
||||||
|
|
||||||
|
ContainerData newData = new ContainerData(containerName);
|
||||||
|
newData.addMetadata("VOLUME", "shire_new");
|
||||||
|
newData.addMetadata("owner)", "bilbo_new");
|
||||||
|
|
||||||
|
containerManager.updateContainer(
|
||||||
|
createSingleNodePipeline(containerName),
|
||||||
|
containerName,
|
||||||
|
newData);
|
||||||
|
|
||||||
|
Assert.assertEquals(1, containerManager.getContainerMap().size());
|
||||||
|
Assert.assertTrue(containerManager.getContainerMap()
|
||||||
|
.containsKey(containerName));
|
||||||
|
|
||||||
|
// Verify in-memory map
|
||||||
|
ContainerData actualNewData = containerManager.getContainerMap()
|
||||||
|
.get(containerName).getContainer();
|
||||||
|
Assert.assertEquals(actualNewData.getAllMetadata().get("VOLUME"),
|
||||||
|
"shire_new");
|
||||||
|
Assert.assertEquals(actualNewData.getAllMetadata().get("owner)"),
|
||||||
|
"bilbo_new");
|
||||||
|
|
||||||
|
// Verify container data on disk
|
||||||
|
File newContainerFile = containerManager.getContainerFile(actualNewData);
|
||||||
|
Assert.assertTrue("Container file should exist.",
|
||||||
|
newContainerFile.exists());
|
||||||
|
Assert.assertEquals("Container file should be in same location.",
|
||||||
|
orgContainerFile.getAbsolutePath(),
|
||||||
|
newContainerFile.getAbsolutePath());
|
||||||
|
|
||||||
|
try (FileInputStream newIn = new FileInputStream(newContainerFile)) {
|
||||||
|
ContainerProtos.ContainerData actualContainerDataProto =
|
||||||
|
ContainerProtos.ContainerData.parseDelimitedFrom(newIn);
|
||||||
|
ContainerData actualContainerData = ContainerData
|
||||||
|
.getFromProtBuf(actualContainerDataProto);
|
||||||
|
Assert.assertEquals(actualContainerData.getAllMetadata().get("VOLUME"),
|
||||||
|
"shire_new");
|
||||||
|
Assert.assertEquals(actualContainerData.getAllMetadata().get("owner)"),
|
||||||
|
"bilbo_new");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update a non-existing container
|
||||||
|
exception.expect(StorageContainerException.class);
|
||||||
|
exception.expectMessage("Container doesn't exist.");
|
||||||
|
containerManager.updateContainer(
|
||||||
|
createSingleNodePipeline("non_exist_container"),
|
||||||
|
"non_exist_container", newData);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests ozone containers.
|
* Tests ozone containers.
|
||||||
|
@ -177,6 +179,27 @@ public class TestOzoneContainer {
|
||||||
Assert.assertNotNull(response);
|
Assert.assertNotNull(response);
|
||||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||||
|
|
||||||
|
//Update an existing container
|
||||||
|
Map<String, String> containerUpdate = new HashMap<String, String>();
|
||||||
|
containerUpdate.put("container_updated_key", "container_updated_value");
|
||||||
|
ContainerProtos.ContainerCommandRequestProto updateRequest1 =
|
||||||
|
ContainerTestHelper.getUpdateContainerRequest(
|
||||||
|
containerName, containerUpdate);
|
||||||
|
ContainerProtos.ContainerCommandResponseProto updateResponse1 =
|
||||||
|
client.sendCommand(updateRequest1);
|
||||||
|
Assert.assertNotNull(updateResponse1);
|
||||||
|
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||||
|
response.getResult());
|
||||||
|
|
||||||
|
//Update an non-existing container
|
||||||
|
ContainerProtos.ContainerCommandRequestProto updateRequest2 =
|
||||||
|
ContainerTestHelper.getUpdateContainerRequest(
|
||||||
|
"non_exist_container", containerUpdate);
|
||||||
|
ContainerProtos.ContainerCommandResponseProto updateResponse2 =
|
||||||
|
client.sendCommand(updateRequest2);
|
||||||
|
Assert.assertEquals(ContainerProtos.Result.CONTAINER_NOT_FOUND,
|
||||||
|
updateResponse2.getResult());
|
||||||
} finally {
|
} finally {
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
client.close();
|
client.close();
|
||||||
|
|
Loading…
Reference in New Issue