HDFS-11491. Ozone: SCM: Add close container RPC. Contributed by Anu Engineer.

This commit is contained in:
Xiaoyu Yao 2017-03-14 21:28:23 -07:00
parent 72f4e3f347
commit 65487b579e
7 changed files with 342 additions and 17 deletions

View File

@ -67,6 +67,12 @@ import "hdfs.proto";
* 13. ListChunk - Given a Container/Key returns the list of Chunks.
*
* 14. CompactChunk - Re-writes a chunk based on Offsets.
*
* 15. PutSmallFile - A single RPC that combines both putKey and WriteChunk.
*
* 16. GetSmallFile - A single RPC that combines both getKey and ReadChunk.
*
* 17. CloseContainer - Closes an open container and makes it immutable.
*/
enum Type {
@ -90,6 +96,7 @@ enum Type {
/** Combines Key and Chunk Operation into Single RPC. */
PutSmallFile = 15;
GetSmallFile = 16;
CloseContainer = 17;
}
@ -116,6 +123,7 @@ enum Result {
INVALID_ARGUMENT = 19;
PUT_SMALL_FILE_ERROR = 20;
GET_SMALL_FILE_ERROR = 21;
CLOSED_CONTAINER_IO = 22;
}
message ContainerCommandRequestProto {
@ -147,6 +155,7 @@ message ContainerCommandRequestProto {
optional PutSmallFileRequestProto putSmallFile = 16;
optional GetSmallFileRequestProto getSmallFile = 17;
optional CloseContainerRequestProto closeContainer = 18;
}
message ContainerCommandResponseProto {
@ -174,6 +183,7 @@ message ContainerCommandResponseProto {
optional PutSmallFileResponseProto putSmallFile = 19;
optional GetSmallFileResponseProto getSmallFile = 20;
optional CloseContainerResponseProto closeContainer = 21;
}
@ -194,6 +204,8 @@ message ContainerData {
repeated KeyValue metadata = 2;
optional string dbPath = 3;
optional string containerPath = 4;
optional bool open = 5 [default = true];
optional string hash = 6;
}
message ContainerMeta {
@ -246,6 +258,14 @@ message ListContainerResponseProto {
repeated ContainerData containerData = 1;
}
message CloseContainerRequestProto {
required Pipeline pipeline = 1;
}
message CloseContainerResponseProto {
optional Pipeline pipeline = 1;
optional string hash = 2;
}
message KeyData {
required string containerName = 1;

View File

@ -38,6 +38,8 @@ public class ContainerData {
private String dbPath; // Path to Level DB Store.
// Path to Physical file system where container and checksum are stored.
private String containerFilePath;
private boolean open;
private String hash;
/**
* Constructs a ContainerData Object.
@ -71,6 +73,15 @@ public class ContainerData {
data.setDBPath(protoData.getDbPath());
}
if (protoData.hasOpen()) {
data.setOpen(protoData.getOpen());
} else {
data.setOpen(true);
}
if(protoData.hasHash()) {
data.setHash(protoData.getHash());
}
return data;
}
@ -98,6 +109,8 @@ public class ContainerData {
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
.setValue(entry.getValue()).build());
}
return builder.build();
}
@ -196,4 +209,41 @@ public class ContainerData {
this.containerFilePath = containerPath;
}
/**
* checks if the container is open.
* @return - boolean
*/
public boolean isOpen() {
return open;
}
/**
* Marks this container as closed.
*/
public void closeContainer() {
this.open = false;
}
/**
* Final hash for this container.
* @return - Hash
*/
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
/**
* Sets the open or closed values.
* @param open
*/
public void setOpen(boolean open) {
this.open = open;
}
}

View File

@ -248,7 +248,7 @@ public class ContainerManagerImpl implements ContainerManager {
// Then read back and put that info into the containerMap.
// This allows us to make sure that our write is consistent.
writeContainerInfo(containerData);
writeContainerInfo(containerData, false);
File cFile = new File(containerData.getContainerPath());
readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
} catch (NoSuchAlgorithmException ex) {
@ -280,9 +280,11 @@ public class ContainerManagerImpl implements ContainerManager {
* }
*
* @param containerData - container Data
* @param overwrite - Whether we are overwriting.
* @throws StorageContainerException, NoSuchAlgorithmException
*/
private void writeContainerInfo(ContainerData containerData)
private void writeContainerInfo(ContainerData containerData,
boolean overwrite)
throws StorageContainerException, NoSuchAlgorithmException {
Preconditions.checkNotNull(this.locationManager,
@ -298,8 +300,9 @@ public class ContainerManagerImpl implements ContainerManager {
location);
File metadataFile = ContainerUtils.getMetadataFile(containerData,
location);
if(!overwrite) {
ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
}
Path metadataPath = this.locationManager.getDataPath(
containerData.getContainerName());
@ -446,6 +449,48 @@ public class ContainerManagerImpl implements ContainerManager {
return containerMap.get(containerName).getContainer();
}
/**
* Closes a open container, if it is already closed or does not exist a
* StorageContainerException is thrown.
*
* @param containerName - Name of the container.
* @throws StorageContainerException
*/
@Override
public void closeContainer(String containerName)
throws StorageContainerException, NoSuchAlgorithmException {
ContainerData containerData = readContainer(containerName);
containerData.closeContainer();
writeContainerInfo(containerData, true);
// Active is different from closed. Closed means it is immutable, active
// false means we have some internal error that is happening to this
// container. This is a way to track damaged containers if we have an
// I/O failure, this allows us to take quick action in case of container
// issues.
ContainerStatus status = new ContainerStatus(containerData, true);
containerMap.put(containerName, status);
}
/**
* Checks if a container exists.
*
* @param containerName - Name of the container.
* @return true if the container is open false otherwise.
* @throws StorageContainerException - Throws Exception if we are not able to
* find the container.
*/
@Override
public boolean isOpen(String containerName) throws StorageContainerException {
ContainerData cData = containerMap.get(containerName).getContainer();
if (cData == null) {
throw new StorageContainerException("Container not found",
CONTAINER_NOT_FOUND);
}
return cData.isOpen();
}
/**
* Supports clean shutdown of container.
*

View File

@ -43,13 +43,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.LinkedList;
import java.util.List;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Result.PUT_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
@ -97,8 +98,9 @@ public class Dispatcher implements ContainerDispatcher {
(cmdType == Type.DeleteContainer) ||
(cmdType == Type.ReadContainer) ||
(cmdType == Type.ListContainer) ||
(cmdType == Type.UpdateContainer)) {
resp = containerProcessHandler(msg);
(cmdType == Type.UpdateContainer) ||
(cmdType == Type.CloseContainer)) {
return containerProcessHandler(msg);
}
if ((cmdType == Type.PutKey) ||
@ -167,6 +169,9 @@ public class Dispatcher implements ContainerDispatcher {
case ReadContainer:
return handleReadContainer(msg);
case CloseContainer:
return handleCloseContainer(msg);
default:
return ContainerUtils.unsupportedRequest(msg);
}
@ -274,6 +279,12 @@ public class Dispatcher implements ContainerDispatcher {
}
}
/**
* Dispatch calls to small file hanlder.
* @param msg - request
* @return response
* @throws StorageContainerException
*/
private ContainerCommandResponseProto smallFileHandler(
ContainerCommandRequestProto msg) throws StorageContainerException {
switch (msg.getCmdType()) {
@ -349,16 +360,46 @@ public class Dispatcher implements ContainerDispatcher {
}
ContainerData cData = ContainerData.getFromProtBuf(
msg.getCreateContainer().getContainerData());
Preconditions.checkNotNull(cData);
Preconditions.checkNotNull(cData, "Container data is null");
Pipeline pipeline = Pipeline.getFromProtoBuf(
msg.getCreateContainer().getPipeline());
Preconditions.checkNotNull(pipeline);
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
this.containerManager.createContainer(pipeline, cData);
return ContainerUtils.getContainerResponse(msg);
}
/**
* closes an open container.
*
* @param msg -
* @return
* @throws IOException
*/
private ContainerCommandResponseProto handleCloseContainer(
ContainerCommandRequestProto msg) throws IOException {
try {
if (!msg.hasCloseContainer()) {
LOG.debug("Malformed close Container request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getCloseContainer()
.getPipeline());
Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Attempting to close a closed " +
"container.", CLOSED_CONTAINER_IO);
}
this.containerManager.closeContainer(pipeline.getContainerName());
return ContainerUtils.getContainerResponse(msg);
} catch (NoSuchAlgorithmException e) {
throw new StorageContainerException("No such Algorithm", e,
NO_SUCH_ALGORITHM);
}
}
/**
* Calls into chunk manager to write a chunk.
*
@ -373,11 +414,14 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
String keyName = msg.getWriteChunk().getKeyName();
Pipeline pipeline = Pipeline.getFromProtoBuf(
msg.getWriteChunk().getPipeline());
Preconditions.checkNotNull(pipeline);
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getWriteChunk()
.getChunkData());
@ -437,7 +481,10 @@ public class Dispatcher implements ContainerDispatcher {
Pipeline pipeline = Pipeline.getFromProtoBuf(
msg.getDeleteChunk().getPipeline());
Preconditions.checkNotNull(pipeline);
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getDeleteChunk()
.getChunkData());
Preconditions.checkNotNull(chunkInfo);
@ -463,6 +510,10 @@ public class Dispatcher implements ContainerDispatcher {
}
Pipeline pipeline = Pipeline.getFromProtoBuf(msg.getPutKey().getPipeline());
Preconditions.checkNotNull(pipeline);
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
Preconditions.checkNotNull(keyData);
this.containerManager.getKeyManager().putKey(pipeline, keyData);
@ -508,10 +559,13 @@ public class Dispatcher implements ContainerDispatcher {
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
Pipeline pipeline =
Pipeline.getFromProtoBuf(msg.getDeleteKey().getPipeline());
Preconditions.checkNotNull(pipeline);
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
String keyName = msg.getDeleteKey().getName();
Preconditions.checkNotNull(keyName);
Preconditions.checkState(!keyName.isEmpty());
@ -541,6 +595,10 @@ public class Dispatcher implements ContainerDispatcher {
.getKey().getPipeline());
Preconditions.checkNotNull(pipeline);
if (!this.containerManager.isOpen(pipeline.getContainerName())) {
throw new StorageContainerException("Write to closed container.",
CLOSED_CONTAINER_IO);
}
KeyData keyData = KeyData.getFromProtoBuf(msg.getPutSmallFile().getKey()
.getKeyData());
ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(msg.getPutSmallFile()

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.protocol.proto
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
/**
@ -92,6 +93,24 @@ public interface ContainerManager extends RwLock {
ContainerData readContainer(String containerName)
throws StorageContainerException;
/**
* Closes a open container, if it is already closed or does not exist a
* StorageContainerException is thrown.
* @param containerName - Name of the container.
* @throws StorageContainerException
*/
void closeContainer(String containerName)
throws StorageContainerException, NoSuchAlgorithmException;
/**
* Checks if a container exists.
* @param containerName - Name of the container.
* @return true if the container is open false otherwise.
* @throws StorageContainerException - Throws Exception if we are not
* able to find the container.
*/
boolean isOpen(String containerName) throws StorageContainerException;
/**
* Supports clean shutdown of container.
*

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
@ -394,4 +395,21 @@ public final class ContainerTestHelper {
return request.build();
}
/**
* Returns a close container request.
* @param pipeline - pipeline
* @return ContainerCommandRequestProto.
*/
public static ContainerCommandRequestProto getCloseContainer(
Pipeline pipeline) {
Preconditions.checkNotNull(pipeline);
ContainerProtos.CloseContainerRequestProto closeReqeuest =
ContainerProtos.CloseContainerRequestProto.newBuilder().setPipeline(
pipeline.getProtobufMessage()).build();
ContainerProtos.ContainerCommandRequestProto cmd =
ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
.Type.CloseContainer).setCloseContainer(closeReqeuest).build();
return cmd;
}
}

View File

@ -79,7 +79,7 @@ public class TestOzoneContainer {
if (container != null) {
container.stop();
}
if(cluster != null) {
if (cluster != null) {
cluster.shutdown();
}
}
@ -250,4 +250,119 @@ public class TestOzoneContainer {
}
}
private void testCloseContainer() throws Exception {
MiniOzoneCluster cluster = null;
XceiverClient client = null;
try {
String keyName = OzoneUtils.getRequestID();
String containerName = OzoneUtils.getRequestID();
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
// Start ozone container Via Datanode create.
Pipeline pipeline =
ContainerTestHelper.createSingleNodePipeline(containerName);
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType("distributed").build();
// This client talks to ozone container via datanode.
client = new XceiverClient(pipeline, conf);
client.connect();
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
keyName, 1024);
ContainerProtos.ContainerCommandRequestProto request;
ContainerProtos.ContainerCommandResponseProto response;
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
.getWriteChunk());
// Write Chunk before closing
response = client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(writeChunkRequest.getTraceID().equals(response
.getTraceID()));
// Put key before closing.
response = client.sendCommand(putKeyRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
response.getResult());
Assert.assertTrue(
putKeyRequest.getTraceID().equals(response.getTraceID()));
// Close the contianer.
request = ContainerTestHelper.getCloseContainer(pipeline);
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Assert that none of the write operations are working after close.
// Write chunks should fail now.
response = client.sendCommand(writeChunkRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Read chunk must work on a closed container.
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
.getWriteChunk());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Put key will fail on a closed container.
response = client.sendCommand(putKeyRequest);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
// Get key must work on the closed container.
request = ContainerTestHelper.getKeyRequest(putKeyRequest.getPutKey());
response = client.sendCommand(request);
ContainerTestHelper.verifyGetKey(request, response);
// Delete Key must fail on a closed container.
request =
ContainerTestHelper.getDeleteKeyRequest(putKeyRequest.getPutKey());
response = client.sendCommand(request);
Assert.assertNotNull(response);
Assert.assertEquals(ContainerProtos.Result.CLOSED_CONTAINER_IO,
response.getResult());
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
} finally {
if (client != null) {
client.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
}