HDFS-9944. Ozone : Add container dispatcher. Contributed by Anu Engineer.

This commit is contained in:
Chris Nauroth 2016-03-14 16:16:35 -07:00 committed by Owen O'Malley
parent b9d53ed836
commit bb47d03906
9 changed files with 636 additions and 5 deletions

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.helpers;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
/**
* This class maintains the information about a container in the ozone world.
* <p>
* A container is a name, along with metadata- which is a set of key value
* pair.
*/
public class ContainerData {
private final String containerName;
private final Map<String, String> metadata;
private String path;
/**
* Constructs a ContainerData Object.
*
* @param containerName - Name
*/
public ContainerData(String containerName) {
this.metadata = new TreeMap<>();
this.containerName = containerName;
}
/**
* Constructs a ContainerData object from ProtoBuf classes.
*
* @param protoData - ProtoBuf Message
* @throws IOException
*/
public static ContainerData getFromProtBuf(
ContainerProtos.ContainerData protoData) throws IOException {
ContainerData data = new ContainerData(protoData.getName());
for (int x = 0; x < protoData.getMetadataCount(); x++) {
data.addMetadata(protoData.getMetadata(x).getKey(),
protoData.getMetadata(x).getValue());
}
if (protoData.hasContainerPath()) {
data.setPath(protoData.getContainerPath());
}
return data;
}
/**
* Returns a ProtoBuf Message from ContainerData.
*
* @return Protocol Buffer Message
*/
public ContainerProtos.ContainerData getProtoBufMessage() {
ContainerProtos.ContainerData.Builder builder = ContainerProtos
.ContainerData.newBuilder();
builder.setName(this.getContainerName());
if (this.getPath() != null) {
builder.setContainerPath(this.getPath());
}
for (Map.Entry<String, String> entry : metadata.entrySet()) {
ContainerProtos.KeyValue.Builder keyValBuilder =
ContainerProtos.KeyValue.newBuilder();
builder.addMetadata(keyValBuilder.setKey(entry.getKey())
.setValue(entry.getValue()).build());
}
return builder.build();
}
/**
* Returns the name of the container.
*
* @return - name
*/
public String getContainerName() {
return containerName;
}
/**
* Adds metadata.
*/
public void addMetadata(String key, String value) throws IOException {
synchronized (this.metadata) {
if (this.metadata.containsKey(key)) {
throw new IOException("This key already exists. Key " + key);
}
metadata.put(key, value);
}
}
/**
* Returns all metadata.
*/
public Map<String, String> getAllMetadata() {
synchronized (this.metadata) {
return Collections.unmodifiableMap(this.metadata);
}
}
/**
* Returns value of a key.
*/
public String getValue(String key) {
synchronized (this.metadata) {
return metadata.get(key);
}
}
/**
* Deletes a metadata entry from the map.
*
* @param key - Key
*/
public void deleteKey(String key) {
synchronized (this.metadata) {
metadata.remove(key);
}
}
/**
* Returns path.
*
* @return - path
*/
public String getPath() {
return path;
}
/**
* Sets path.
*
* @param path - String.
*/
public void setPath(String path) {
this.path = path;
}
/**
* This function serves as the generic key for OzoneCache class. Both
* ContainerData and ContainerKeyData overrides this function to appropriately
* return the right name that can be used in OzoneCache.
*
* @return String Name.
*/
public String getName() {
return getContainerName();
}
}

View File

@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
/**
* A set of helper functions to create proper responses.
*/
public final class ContainerUtils {
/**
* Returns a CreateContainer Response. This call is used by create and delete
* containers which have null success responses.
*
* @param msg Request
* @return Response.
*/
public static ContainerProtos.ContainerCommandResponseProto
getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) {
ContainerProtos.ContainerCommandResponseProto.Builder builder =
getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
return builder.build();
}
/**
* Returns a ReadContainer Response.
*
* @param msg Request
* @return Response.
*/
public static ContainerProtos.ContainerCommandResponseProto
getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
ContainerData containerData) {
Preconditions.checkNotNull(containerData);
ContainerProtos.ReadContainerResponseProto.Builder response =
ContainerProtos.ReadContainerResponseProto.newBuilder();
response.setContainerData(containerData.getProtoBufMessage());
ContainerProtos.ContainerCommandResponseProto.Builder builder =
getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
builder.setReadContainer(response);
return builder.build();
}
/**
* We found a command type but no associated payload for the command. Hence
* return malformed Command as response.
*
* @param msg - Protobuf message.
* @return ContainerCommandResponseProto - MALFORMED_REQUEST.
*/
public static ContainerProtos.ContainerCommandResponseProto.Builder
getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
ContainerProtos.Result result, String message) {
return
ContainerProtos.ContainerCommandResponseProto.newBuilder()
.setCmdType(msg.getCmdType())
.setTraceID(msg.getTraceID())
.setResult(result)
.setMessage(message);
}
/**
* We found a command type but no associated payload for the command. Hence
* return malformed Command as response.
*
* @param msg - Protobuf message.
* @return ContainerCommandResponseProto - MALFORMED_REQUEST.
*/
public static ContainerProtos.ContainerCommandResponseProto
malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST,
"Cmd type does not match the payload.").build();
}
/**
* We found a command type that is not supported yet.
*
* @param msg - Protobuf message.
* @return ContainerCommandResponseProto - MALFORMED_REQUEST.
*/
public static ContainerProtos.ContainerCommandResponseProto
unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST,
"Server does not support this command yet.").build();
}
private ContainerUtils() {
//never constructed.
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.helpers;
/**
Contains protocol buffer helper classes.
**/

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.interfaces;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ozone.container.helpers.ContainerData;
import org.apache.hadoop.ozone.container.helpers.Pipeline;
import java.io.IOException;
import java.util.List;
/**
* Interface for container operations.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface ContainerManager {
/**
* Creates a container with the given name.
*
* @param pipeline -- Nodes which make up this container.
* @param containerData - Container Name and metadata.
* @throws IOException
*/
void createContainer(Pipeline pipeline, ContainerData containerData)
throws IOException;
/**
* Deletes an existing container.
*
* @param pipeline - nodes that make this container.
* @param containerName - name of the container.
* @throws IOException
*/
void deleteContainer(Pipeline pipeline, String containerName)
throws IOException;
/**
* As simple interface for container Iterations.
*
* @param start - Starting index
* @param count - how many to return
* @param data - Actual containerData
* @throws IOException
*/
void listContainer(long start, long count, List<ContainerData> data)
throws IOException;
/**
* Get metadata about a specific container.
*
* @param containerName - Name of the container
* @return ContainerData
* @throws IOException
*/
ContainerData readContainer(String containerName) throws IOException;
}

View File

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.container.helpers.ContainerData;
import org.apache.hadoop.ozone.container.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.helpers.Pipeline;
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.interfaces.ContainerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Ozone Container dispatcher takes a call from the netty server and routes it
* to the right handler function.
*/
public class Dispatcher implements ContainerDispatcher {
static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
private final ContainerManager containerManager;
/**
* Constructs an OzoneContainer that receives calls from
* XceiverServerHandler.
*
* @param containerManager - A class that manages containers.
*/
public Dispatcher(ContainerManager containerManager) {
Preconditions.checkNotNull(containerManager);
this.containerManager = containerManager;
}
@Override
public ContainerCommandResponseProto dispatch(
ContainerCommandRequestProto msg) throws IOException {
Preconditions.checkNotNull(msg);
Type cmdType = msg.getCmdType();
if ((cmdType == Type.CreateContainer) ||
(cmdType == Type.DeleteContainer) ||
(cmdType == Type.ReadContainer) ||
(cmdType == Type.ListContainer)) {
return containerProcessHandler(msg);
}
return ContainerUtils.unsupportedRequest(msg);
}
/**
* Handles the all Container related functionality.
*
* @param msg - command
* @return - response
* @throws IOException
*/
private ContainerCommandResponseProto containerProcessHandler(
ContainerCommandRequestProto msg) throws IOException {
try {
ContainerData cData = ContainerData.getFromProtBuf(
msg.getCreateContainer().getContainerData());
Pipeline pipeline = Pipeline.getFromProtoBuf(
msg.getCreateContainer().getPipeline());
Preconditions.checkNotNull(pipeline);
switch (msg.getCmdType()) {
case CreateContainer:
return handleCreateContainer(msg, cData, pipeline);
case DeleteContainer:
return handleDeleteContainer(msg, cData, pipeline);
case ListContainer:
return ContainerUtils.unsupportedRequest(msg);
case ReadContainer:
return handleReadContainer(msg, cData);
default:
return ContainerUtils.unsupportedRequest(msg);
}
} catch (IOException ex) {
LOG.warn("Container operation failed. " +
"Container: {} Operation: {} trace ID: {} Error: {}",
msg.getCreateContainer().getContainerData().getName(),
msg.getCmdType().name(),
msg.getTraceID(),
ex.toString());
// TODO : Replace with finer error codes.
return ContainerUtils.getContainerResponse(msg,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
ex.toString()).build();
}
}
/**
* Calls into container logic and returns appropriate response.
*
* @param msg - Request
* @param cData - Container Data object
* @return ContainerCommandResponseProto
* @throws IOException
*/
private ContainerCommandResponseProto handleReadContainer(
ContainerCommandRequestProto msg, ContainerData cData)
throws IOException {
if (!msg.hasReadContainer()) {
LOG.debug("Malformed read container request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
ContainerData container = this.containerManager.readContainer(
cData.getContainerName());
return ContainerUtils.getReadContainerResponse(msg, container);
}
/**
* Calls into container logic and returns appropriate response.
*
* @param msg - Request
* @param cData - ContainerData
* @param pipeline - Pipeline is the machines where this container lives.
* @return Response.
* @throws IOException
*/
private ContainerCommandResponseProto handleDeleteContainer(
ContainerCommandRequestProto msg, ContainerData cData,
Pipeline pipeline) throws IOException {
if (!msg.hasDeleteContainer()) {
LOG.debug("Malformed delete container request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
this.containerManager.deleteContainer(pipeline,
cData.getContainerName());
return ContainerUtils.getContainerResponse(msg);
}
/**
* Calls into container logic and returns appropriate response.
*
* @param msg - Request
* @param cData - ContainerData
* @param pipeline - Pipeline is the machines where this container lives.
* @return Response.
* @throws IOException
*/
private ContainerCommandResponseProto handleCreateContainer(
ContainerCommandRequestProto msg, ContainerData cData,
Pipeline pipeline) throws IOException {
if (!msg.hasCreateContainer()) {
LOG.debug("Malformed create container request. trace ID: {}",
msg.getTraceID());
return ContainerUtils.malformedRequest(msg);
}
this.containerManager.createContainer(pipeline, cData);
return ContainerUtils.getContainerResponse(msg);
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ozoneimpl;
/**
This package is contains Ozone container implementation.
**/

View File

@ -90,6 +90,14 @@ enum Type {
}
enum Result {
SUCCESS = 1;
UNSUPPORTED_REQUEST = 2;
MALFORMED_REQUEST = 3;
CONTAINER_INTERNAL_ERROR = 4;
}
message ContainerCommandRequestProto {
required Type cmdType = 1; // Type of the command
@ -140,6 +148,9 @@ message ContainerCommandResponseProto {
optional DeleteChunkResponseProto deleteChunk = 15;
optional ListChunkResponseProto listChunk = 16;
required Result result = 17;
optional string message = 18;
}
// A pipeline is composed of one or more datanodes that back a container.

View File

@ -96,6 +96,7 @@ public class ContainerTestHelper {
response.setCmdType(ContainerProtos.Type.CreateContainer);
response.setTraceID(request.getTraceID());
response.setCreateContainer(createResponse.build());
response.setResult(ContainerProtos.Result.SUCCESS);
return response.build();
}

View File

@ -19,21 +19,24 @@
package org.apache.hadoop.ozone.container.transport.server;
import io.netty.channel.embedded.EmbeddedChannel;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.helpers.Pipeline;
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.ozoneimpl.Dispatcher;
import org.apache.hadoop.ozone.container.transport.client.XceiverClient;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import static org.mockito.Mockito.mock;
public class TestContainerServer {
@Test
@ -86,6 +89,39 @@ public class TestContainerServer {
}
}
@Test
public void testClientServerWithContainerDispatcher() throws Exception {
XceiverServer server = null;
XceiverClient client = null;
try {
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
pipeline.getLeader().getContainerPort());
server = new XceiverServer(conf, new Dispatcher(
mock(ContainerManager.class)));
client = new XceiverClient(pipeline, conf);
server.start();
client.connect();
ContainerCommandRequestProto request =
ContainerTestHelper.getCreateContainerRequest();
ContainerCommandResponseProto response = client.sendCommand(request);
Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
} finally {
if (client != null) {
client.close();
}
if (server != null) {
server.stop();
}
}
}
private class TestContainerDispatcher implements ContainerDispatcher {
/**
* Dispatches commands to container layer.