HDFS-11581. Ozone: Support force delete a container. Contributed by Yuanbo Liu.
This commit is contained in:
parent
350220bfb3
commit
368424b465
|
@ -126,6 +126,7 @@ enum Result {
|
|||
CLOSED_CONTAINER_IO = 22;
|
||||
ERROR_CONTAINER_NOT_EMPTY = 23;
|
||||
ERROR_IN_COMPACT_DB = 24;
|
||||
UNCLOSED_CONTAINER_IO = 25;
|
||||
}
|
||||
|
||||
message ContainerCommandRequestProto {
|
||||
|
@ -244,6 +245,7 @@ message UpdateContainerResponseProto {
|
|||
message DeleteContainerRequestProto {
|
||||
required Pipeline pipeline = 1;
|
||||
required string name = 2;
|
||||
optional bool forceDelete = 3 [default = false];
|
||||
}
|
||||
|
||||
message DeleteContainerResponseProto {
|
||||
|
|
|
@ -338,15 +338,19 @@ public final class ContainerUtils {
|
|||
* we created on the data location.
|
||||
*
|
||||
* @param containerData - Data of the container to remove.
|
||||
* @param conf - configuration of the cluster.
|
||||
* @param forceDelete - whether this container should be deleted forcibly.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void removeContainer(ContainerData containerData,
|
||||
Configuration conf) throws IOException {
|
||||
Configuration conf, boolean forceDelete) throws IOException {
|
||||
Preconditions.checkNotNull(containerData);
|
||||
Path dbPath = Paths.get(containerData.getDBPath());
|
||||
|
||||
LevelDBStore db = KeyUtils.getDB(containerData, conf);
|
||||
if(!db.isEmpty()) {
|
||||
// If the container is not empty and cannot be deleted forcibly,
|
||||
// then throw a SCE to stop deleting.
|
||||
if(!forceDelete && !db.isEmpty()) {
|
||||
throw new StorageContainerException(
|
||||
"Container cannot be deleted because it is not empty.",
|
||||
ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
|
||||
|
|
|
@ -372,11 +372,12 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
*
|
||||
* @param pipeline - nodes that make this container.
|
||||
* @param containerName - name of the container.
|
||||
* @param forceDelete - whether this container should be deleted forcibly.
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
@Override
|
||||
public void deleteContainer(Pipeline pipeline, String containerName) throws
|
||||
StorageContainerException {
|
||||
public void deleteContainer(Pipeline pipeline, String containerName,
|
||||
boolean forceDelete) throws StorageContainerException {
|
||||
Preconditions.checkNotNull(containerName, "Container name cannot be null");
|
||||
Preconditions.checkState(containerName.length() > 0,
|
||||
"Container name length cannot be zero.");
|
||||
|
@ -388,7 +389,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
|||
throw new StorageContainerException("No such container. Name : " +
|
||||
containerName, CONTAINER_NOT_FOUND);
|
||||
}
|
||||
ContainerUtils.removeContainer(status.containerData, conf);
|
||||
ContainerUtils.removeContainer(status.containerData, conf, forceDelete);
|
||||
containerMap.remove(containerName);
|
||||
} catch (StorageContainerException e) {
|
||||
throw e;
|
||||
|
|
|
@ -51,6 +51,7 @@ import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result
|
|||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.NO_SUCH_ALGORITHM;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
|
||||
import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result.UNCLOSED_CONTAINER_IO;
|
||||
|
||||
/**
|
||||
* Ozone Container dispatcher takes a call from the netty server and routes it
|
||||
|
@ -362,11 +363,22 @@ public class Dispatcher implements ContainerDispatcher {
|
|||
}
|
||||
|
||||
String name = msg.getDeleteContainer().getName();
|
||||
boolean forceDelete = msg.getDeleteContainer().getForceDelete();
|
||||
Pipeline pipeline = Pipeline.getFromProtoBuf(
|
||||
msg.getDeleteContainer().getPipeline());
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
|
||||
this.containerManager.deleteContainer(pipeline, name);
|
||||
// If this cmd requires force deleting, then we should
|
||||
// make sure the container is in the state of closed,
|
||||
// otherwise, stop deleting container.
|
||||
if (forceDelete) {
|
||||
if (this.containerManager.isOpen(pipeline.getContainerName())) {
|
||||
throw new StorageContainerException("Attempting to force delete "
|
||||
+ "an open container.", UNCLOSED_CONTAINER_IO);
|
||||
}
|
||||
}
|
||||
|
||||
this.containerManager.deleteContainer(pipeline, name, forceDelete);
|
||||
return ContainerUtils.getContainerResponse(msg);
|
||||
}
|
||||
|
||||
|
|
|
@ -64,10 +64,11 @@ public interface ContainerManager extends RwLock {
|
|||
*
|
||||
* @param pipeline - nodes that make this container.
|
||||
* @param containerName - name of the container.
|
||||
* @param forceDelete - whether this container should be deleted forcibly.
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
void deleteContainer(Pipeline pipeline, String containerName)
|
||||
throws StorageContainerException;
|
||||
void deleteContainer(Pipeline pipeline, String containerName,
|
||||
boolean forceDelete) throws StorageContainerException;
|
||||
|
||||
/**
|
||||
* Update an existing container.
|
||||
|
|
|
@ -508,4 +508,19 @@ public final class ContainerTestHelper {
|
|||
return cmd;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a delete container request.
|
||||
* @param pipeline - pipeline
|
||||
* @return ContainerCommandRequestProto.
|
||||
*/
|
||||
public static ContainerCommandRequestProto getDeleteContainer(
|
||||
Pipeline pipeline, boolean forceDelete) {
|
||||
Preconditions.checkNotNull(pipeline);
|
||||
ContainerProtos.DeleteContainerRequestProto deleteRequest =
|
||||
ContainerProtos.DeleteContainerRequestProto.newBuilder().setName(
|
||||
pipeline.getContainerName()).setPipeline(
|
||||
pipeline.getProtobufMessage()).setForceDelete(forceDelete).build();
|
||||
return ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos
|
||||
.Type.DeleteContainer).setDeleteContainer(deleteRequest).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -219,7 +219,7 @@ public class TestContainerPersistence {
|
|||
.containsKey(containerName2));
|
||||
|
||||
containerManager.deleteContainer(createSingleNodePipeline(containerName1),
|
||||
containerName1);
|
||||
containerName1, false);
|
||||
Assert.assertFalse(containerManager.getContainerMap()
|
||||
.containsKey(containerName1));
|
||||
|
||||
|
@ -251,7 +251,7 @@ public class TestContainerPersistence {
|
|||
"Container cannot be deleted because it is not empty.");
|
||||
containerManager.deleteContainer(
|
||||
createSingleNodePipeline(containerName1),
|
||||
containerName1);
|
||||
containerName1, false);
|
||||
Assert.assertTrue(containerManager.getContainerMap()
|
||||
.containsKey(containerName1));
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ public class TestOzoneContainer {
|
|||
pipeline, conf);
|
||||
|
||||
try {
|
||||
runTestOzoneContainerViaDataNode(containerName, pipeline, client);
|
||||
runTestOzoneContainerViaDataNode(containerName, client);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ public class TestOzoneContainer {
|
|||
// This client talks to ozone container via datanode.
|
||||
XceiverClient client = new XceiverClient(pipeline, conf);
|
||||
|
||||
runTestOzoneContainerViaDataNode(containerName, pipeline, client);
|
||||
runTestOzoneContainerViaDataNode(containerName, client);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
@ -178,30 +178,19 @@ public class TestOzoneContainer {
|
|||
}
|
||||
}
|
||||
|
||||
static void runTestOzoneContainerViaDataNode(
|
||||
String containerName, Pipeline pipeline, XceiverClientSpi client)
|
||||
throws Exception {
|
||||
static void runTestOzoneContainerViaDataNode(String containerName,
|
||||
XceiverClientSpi client) throws Exception {
|
||||
ContainerProtos.ContainerCommandRequestProto
|
||||
request, writeChunkRequest, putKeyRequest,
|
||||
updateRequest1, updateRequest2;
|
||||
ContainerProtos.ContainerCommandResponseProto response,
|
||||
updateResponse1, updateResponse2;
|
||||
try {
|
||||
client.connect();
|
||||
|
||||
// Create container
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Write Chunk
|
||||
final String keyName = OzoneUtils.getRequestID();
|
||||
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
|
||||
keyName, 1024);
|
||||
|
||||
response = client.sendCommand(writeChunkRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
createContainerForTesting(client, containerName);
|
||||
writeChunkRequest = writeChunkForContainer(client, containerName, 1024);
|
||||
|
||||
// Read Chunk
|
||||
request = ContainerTestHelper.getReadChunkRequest(writeChunkRequest
|
||||
|
@ -213,8 +202,7 @@ public class TestOzoneContainer {
|
|||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Put Key
|
||||
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
|
||||
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
|
||||
putKeyRequest = ContainerTestHelper.getPutKeyRequest(writeChunkRequest
|
||||
.getWriteChunk());
|
||||
|
||||
|
||||
|
@ -249,21 +237,17 @@ public class TestOzoneContainer {
|
|||
//Update an existing container
|
||||
Map<String, String> containerUpdate = new HashMap<String, String>();
|
||||
containerUpdate.put("container_updated_key", "container_updated_value");
|
||||
ContainerProtos.ContainerCommandRequestProto updateRequest1 =
|
||||
ContainerTestHelper.getUpdateContainerRequest(
|
||||
updateRequest1 = ContainerTestHelper.getUpdateContainerRequest(
|
||||
containerName, containerUpdate);
|
||||
ContainerProtos.ContainerCommandResponseProto updateResponse1 =
|
||||
client.sendCommand(updateRequest1);
|
||||
updateResponse1 = client.sendCommand(updateRequest1);
|
||||
Assert.assertNotNull(updateResponse1);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||
response.getResult());
|
||||
|
||||
//Update an non-existing container
|
||||
ContainerProtos.ContainerCommandRequestProto updateRequest2 =
|
||||
ContainerTestHelper.getUpdateContainerRequest(
|
||||
updateRequest2 = ContainerTestHelper.getUpdateContainerRequest(
|
||||
"non_exist_container", containerUpdate);
|
||||
ContainerProtos.ContainerCommandResponseProto updateResponse2 =
|
||||
client.sendCommand(updateRequest2);
|
||||
updateResponse2 = client.sendCommand(updateRequest2);
|
||||
Assert.assertEquals(ContainerProtos.Result.CONTAINER_NOT_FOUND,
|
||||
updateResponse2.getResult());
|
||||
} finally {
|
||||
|
@ -277,52 +261,31 @@ public class TestOzoneContainer {
|
|||
public void testBothGetandPutSmallFile() throws Exception {
|
||||
MiniOzoneCluster cluster = null;
|
||||
XceiverClient client = null;
|
||||
ContainerProtos.ContainerCommandResponseProto response;
|
||||
ContainerProtos.ContainerCommandRequestProto
|
||||
smallFileRequest, getSmallFileRequest;
|
||||
try {
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
URL p = conf.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
|
||||
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
|
||||
|
||||
// Start ozone container Via Datanode create.
|
||||
|
||||
Pipeline pipeline =
|
||||
ContainerTestHelper.createSingleNodePipeline(containerName);
|
||||
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
||||
pipeline.getLeader().getContainerPort());
|
||||
|
||||
client = createClientForTesting(conf);
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.setRandomContainerPort(false)
|
||||
.setHandlerType("distributed").build();
|
||||
|
||||
// This client talks to ozone container via datanode.
|
||||
client = new XceiverClient(pipeline, conf);
|
||||
client.connect();
|
||||
|
||||
// Create container
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto smallFileRequest =
|
||||
ContainerTestHelper.getWriteSmallFileRequest(pipeline, containerName,
|
||||
keyName, 1024);
|
||||
String containerName = client.getPipeline().getContainerName();
|
||||
createContainerForTesting(client, containerName);
|
||||
|
||||
smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
|
||||
client.getPipeline(), containerName, keyName, 1024);
|
||||
|
||||
response = client.sendCommand(smallFileRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(smallFileRequest.getTraceID()
|
||||
.equals(response.getTraceID()));
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto getSmallFileRequest =
|
||||
getSmallFileRequest =
|
||||
ContainerTestHelper.getReadSmallFileRequest(smallFileRequest
|
||||
.getPutSmallFile().getKey());
|
||||
response = client.sendCommand(getSmallFileRequest);
|
||||
|
@ -343,58 +306,25 @@ public class TestOzoneContainer {
|
|||
public void testCloseContainer() throws Exception {
|
||||
MiniOzoneCluster cluster = null;
|
||||
XceiverClient client = null;
|
||||
ContainerProtos.ContainerCommandResponseProto response;
|
||||
ContainerProtos.ContainerCommandRequestProto
|
||||
writeChunkRequest, putKeyRequest, request;
|
||||
try {
|
||||
|
||||
String keyName = OzoneUtils.getRequestID();
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
URL p = conf.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
|
||||
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
|
||||
|
||||
// Start ozone container Via Datanode create.
|
||||
|
||||
Pipeline pipeline =
|
||||
ContainerTestHelper.createSingleNodePipeline(containerName);
|
||||
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
||||
pipeline.getLeader().getContainerPort());
|
||||
|
||||
client = createClientForTesting(conf);
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.setRandomContainerPort(false)
|
||||
.setHandlerType("distributed").build();
|
||||
|
||||
// This client talks to ozone container via datanode.
|
||||
client = new XceiverClient(pipeline, conf);
|
||||
client.connect();
|
||||
|
||||
String containerName = client.getPipeline().getContainerName();
|
||||
createContainerForTesting(client, containerName);
|
||||
writeChunkRequest = writeChunkForContainer(client, containerName, 1024);
|
||||
|
||||
|
||||
// Create container
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||
ContainerTestHelper.getWriteChunkRequest(pipeline, containerName,
|
||||
keyName, 1024);
|
||||
|
||||
// Write Chunk before closing
|
||||
response = client.sendCommand(writeChunkRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||
response.getResult());
|
||||
Assert.assertTrue(writeChunkRequest.getTraceID().equals(response
|
||||
.getTraceID()));
|
||||
|
||||
|
||||
ContainerProtos.ContainerCommandRequestProto putKeyRequest =
|
||||
ContainerTestHelper.getPutKeyRequest(writeChunkRequest
|
||||
putKeyRequest = ContainerTestHelper.getPutKeyRequest(writeChunkRequest
|
||||
.getWriteChunk());
|
||||
// Put key before closing.
|
||||
response = client.sendCommand(putKeyRequest);
|
||||
|
@ -405,7 +335,7 @@ public class TestOzoneContainer {
|
|||
putKeyRequest.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Close the contianer.
|
||||
request = ContainerTestHelper.getCloseContainer(pipeline);
|
||||
request = ContainerTestHelper.getCloseContainer(client.getPipeline());
|
||||
response = client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
|
@ -460,4 +390,133 @@ public class TestOzoneContainer {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteContainer() throws Exception {
|
||||
MiniOzoneCluster cluster = null;
|
||||
XceiverClient client = null;
|
||||
ContainerProtos.ContainerCommandResponseProto response;
|
||||
ContainerProtos.ContainerCommandRequestProto request,
|
||||
writeChunkRequest, putKeyRequest;
|
||||
try {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
|
||||
client = createClientForTesting(conf);
|
||||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.setRandomContainerPort(false)
|
||||
.setHandlerType("distributed").build();
|
||||
client.connect();
|
||||
|
||||
String containerName = client.getPipeline().getContainerName();
|
||||
createContainerForTesting(client, containerName);
|
||||
writeChunkRequest = writeChunkForContainer(client, containerName, 1024);
|
||||
|
||||
putKeyRequest = ContainerTestHelper.getPutKeyRequest(writeChunkRequest
|
||||
.getWriteChunk());
|
||||
// Put key before deleting.
|
||||
response = client.sendCommand(putKeyRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||
response.getResult());
|
||||
Assert.assertTrue(
|
||||
putKeyRequest.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Container cannot be deleted because the container is not empty.
|
||||
request = ContainerTestHelper.getDeleteContainer(
|
||||
client.getPipeline(), false);
|
||||
response = client.sendCommand(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Container cannot be deleted forcibly because
|
||||
// the container is not closed.
|
||||
request = ContainerTestHelper.getDeleteContainer(
|
||||
client.getPipeline(), true);
|
||||
response = client.sendCommand(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.UNCLOSED_CONTAINER_IO,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Close the container.
|
||||
request = ContainerTestHelper.getCloseContainer(client.getPipeline());
|
||||
response = client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
// Container can be deleted forcibly because
|
||||
// it has been closed.
|
||||
request = ContainerTestHelper.getDeleteContainer(
|
||||
client.getPipeline(), true);
|
||||
response = client.sendCommand(request);
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS,
|
||||
response.getResult());
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
|
||||
} finally {
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private XceiverClient createClientForTesting(OzoneConfiguration conf)
|
||||
throws Exception {
|
||||
|
||||
String containerName = OzoneUtils.getRequestID();
|
||||
URL p = conf.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
|
||||
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
|
||||
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
|
||||
|
||||
// Start ozone container Via Datanode create.
|
||||
|
||||
Pipeline pipeline =
|
||||
ContainerTestHelper.createSingleNodePipeline(containerName);
|
||||
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
|
||||
pipeline.getLeader().getContainerPort());
|
||||
|
||||
// This client talks to ozone container via datanode.
|
||||
return new XceiverClient(pipeline, conf);
|
||||
}
|
||||
|
||||
private static void createContainerForTesting(XceiverClientSpi client,
|
||||
String containerName) throws Exception {
|
||||
// Create container
|
||||
ContainerProtos.ContainerCommandRequestProto request =
|
||||
ContainerTestHelper.getCreateContainerRequest(containerName);
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(request);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
|
||||
}
|
||||
|
||||
private static ContainerProtos.ContainerCommandRequestProto
|
||||
writeChunkForContainer(XceiverClientSpi client,
|
||||
String containerName, int dataLen) throws Exception {
|
||||
// Write Chunk
|
||||
final String keyName = OzoneUtils.getRequestID();
|
||||
ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
|
||||
ContainerTestHelper.getWriteChunkRequest(client.getPipeline(),
|
||||
containerName, keyName, dataLen);
|
||||
|
||||
ContainerProtos.ContainerCommandResponseProto response =
|
||||
client.sendCommand(writeChunkRequest);
|
||||
Assert.assertNotNull(response);
|
||||
Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult());
|
||||
Assert.assertTrue(response.getTraceID().equals(response.getTraceID()));
|
||||
return writeChunkRequest;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue