diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java index 6413ac0a9e7..b8c6a13b952 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java @@ -17,36 +17,58 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS; -import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_DEFAULT_PORT; +import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS; +import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE; +import java.io.Closeable; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import com.sun.jersey.api.container.ContainerFactory; import com.sun.jersey.api.core.ApplicationAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.web.handlers.ServiceFilter; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.ObjectStoreApplication; import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler; import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler; +import org.apache.hadoop.security.UserGroupInformation; /** * Implements object store handling within the DataNode process. This class is * responsible for initializing and maintaining the RPC clients and servers and * the web application required for the object store implementation. */ -public final class ObjectStoreHandler { +public final class ObjectStoreHandler implements Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(ObjectStoreJerseyContainer.class); private final ObjectStoreJerseyContainer objectStoreJerseyContainer; + private final StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; /** * Creates a new ObjectStoreHandler. @@ -57,14 +79,32 @@ public final class ObjectStoreHandler { public ObjectStoreHandler(Configuration conf) throws IOException { String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY, DFS_STORAGE_HANDLER_TYPE_DEFAULT); + LOG.info("ObjectStoreHandler initializing with {}: {}", + DFS_STORAGE_HANDLER_TYPE_KEY, shType); boolean ozoneTrace = conf.getBoolean(DFS_OBJECTSTORE_TRACE_ENABLED_KEY, DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT); final StorageHandler storageHandler; + + // Initialize Jersey container for object store web application. if ("distributed".equalsIgnoreCase(shType)) { - storageHandler = new DistributedStorageHandler(); + RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class, + ProtobufRpcEngine.class); + long version = + RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class); + InetSocketAddress address = conf.getSocketAddr( + DFS_STORAGE_RPC_BIND_HOST_KEY, DFS_STORAGE_RPC_ADDRESS_KEY, + DFS_STORAGE_RPC_ADDRESS_DEFAULT, DFS_STORAGE_RPC_DEFAULT_PORT); + this.storageContainerLocationClient = + new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf))); + storageHandler = new DistributedStorageHandler(new OzoneConfiguration(), + this.storageContainerLocationClient); } else { if ("local".equalsIgnoreCase(shType)) { storageHandler = new LocalStorageHandler(conf); + this.storageContainerLocationClient = null; } else { throw new IllegalArgumentException( String.format("Unrecognized value for %s: %s", @@ -91,4 +131,12 @@ public ObjectStoreHandler(Configuration conf) throws IOException { public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() { return this.objectStoreJerseyContainer; } + + @Override + public void close() { + LOG.info("Closing ObjectStoreHandler."); + if (this.storageContainerLocationClient != null) { + this.storageContainerLocationClient.close(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index bebbb782065..1ffaa2feb3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -68,6 +68,8 @@ public final class OzoneConsts { public static final String FILE_HASH = "SHA-256"; public final static String CHUNK_OVERWRITE = "OverWriteRequested"; + public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB + /** * Supports Bucket Versioning. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java index 15e4524878b..b4c8aa6a720 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java @@ -169,7 +169,8 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo, StandardOpenOption.SPARSE, StandardOpenOption.SYNC); lock = file.lock().get(); - if (!chunkInfo.getChecksum().isEmpty()) { + if (chunkInfo.getChecksum() != null && + !chunkInfo.getChecksum().isEmpty()) { verifyChecksum(chunkInfo, data, log); } int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java index 66ff1baf2b5..bad1d23aecc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java @@ -127,7 +127,7 @@ private ContainerCommandResponseProto containerProcessHandler( msg.getCreateContainer().getContainerData().getName(), msg.getCmdType().name(), msg.getTraceID(), - ex.toString()); + ex.toString(), ex); // TODO : Replace with finer error codes. return ContainerUtils.getContainerResponse(msg, @@ -169,7 +169,7 @@ private ContainerCommandResponseProto keyProcessHandler( msg.getCreateContainer().getContainerData().getName(), msg.getCmdType().name(), msg.getTraceID(), - ex.toString()); + ex.toString(), ex); // TODO : Replace with finer error codes. return ContainerUtils.getContainerResponse(msg, @@ -210,7 +210,7 @@ private ContainerCommandResponseProto chunkProcessHandler( msg.getCreateContainer().getContainerData().getName(), msg.getCmdType().name(), msg.getTraceID(), - ex.toString()); + ex.toString(), ex); // TODO : Replace with finer error codes. return ContainerUtils.getContainerResponse(msg, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java index 3b9ba8da72c..e6d914a56bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java @@ -34,12 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; /** * A Client for the storageContainer protocol. */ -public class XceiverClient { +public class XceiverClient implements Closeable { static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); private final Pipeline pipeline; private final Configuration config; @@ -92,6 +93,7 @@ public void connect() throws Exception { /** * Close the client. */ + @Override public void close() { if(group != null) { group.shutdownGracefully(); @@ -102,6 +104,16 @@ public void close() { } } + /** + * Returns the pipeline of machines that host the container used by this + * client. + * + * @return pipeline of machines that host the container + */ + public Pipeline getPipeline() { + return pipeline; + } + /** * Sends a given command to server and gets the reply back. * @param request Request diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java index a219e4e3824..c9a3ad33ec0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java @@ -94,7 +94,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ContainerProtos.ContainerCommandResponseProto response; channel.writeAndFlush(request); boolean interrupted = false; - for (; ; ) { + for (;;) { try { response = responses.take(); break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java new file mode 100644 index 00000000000..8123ae9cea2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientManager.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.client; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; + +/** + * XceiverClientManager is responsible for the lifecycle of XceiverClient + * instances. Callers use this class to acquire an XceiverClient instance + * connected to the desired container pipeline. When done, the caller also uses + * this class to release the previously acquired XceiverClient instance. + * + * This class may evolve to implement efficient lifecycle management policies by + * caching container location information and pooling connected client instances + * for reuse without needing to reestablish a socket connection. The current + * implementation simply allocates and closes a new instance every time. + */ +public class XceiverClientManager { + + private final OzoneConfiguration conf; + + /** + * Creates a new XceiverClientManager. + * + * @param conf configuration + */ + public XceiverClientManager(OzoneConfiguration conf) { + Preconditions.checkNotNull(conf); + this.conf = conf; + } + + /** + * Acquires a XceiverClient connected to a container capable of storing the + * specified key. + * + * @param pipeline the container pipeline for the client connection + * @return XceiverClient connected to a container + * @throws IOException if an XceiverClient cannot be acquired + */ + public XceiverClient acquireClient(Pipeline pipeline) throws IOException { + Preconditions.checkNotNull(pipeline); + Preconditions.checkArgument(pipeline.getMachines() != null); + Preconditions.checkArgument(!pipeline.getMachines().isEmpty()); + XceiverClient xceiverClient = new XceiverClient(pipeline, conf); + try { + xceiverClient.connect(); + } catch (Exception e) { + throw new IOException("Exception connecting XceiverClient.", e); + } + return xceiverClient; + } + + /** + * Releases an XceiverClient after use. + * + * @param xceiverClient client to release + */ + public void releaseClient(XceiverClient xceiverClient) { + Preconditions.checkNotNull(xceiverClient); + xceiverClient.close(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java new file mode 100644 index 00000000000..d3c02785936 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.client; + +/** + * This package contains classes for the client of the storage container + * protocol. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java new file mode 100644 index 00000000000..59c96f13496 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.common.transport.server; + +/** + * This package contains classes for the server of the storage container + * protocol. + */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java index 90e200a604d..16863c93a9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/storage/StorageContainerManager.java @@ -48,10 +48,15 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -81,6 +86,10 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; import org.apache.hadoop.ozone.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; @@ -94,11 +103,14 @@ * * The current implementation is a stub suitable to begin end-to-end testing of * Ozone service interactions. DataNodes report to StorageContainerManager - * using the existing heartbeat messages. StorageContainerManager tells clients - * container locations by reporting that all registered nodes are a viable - * location. This will evolve from a stub to a full-fledged implementation - * capable of partitioning the keyspace across multiple containers, with - * appropriate distribution across nodes. + * using the existing heartbeat messages. StorageContainerManager lazily + * initializes a single storage container to be served by those DataNodes. + * All subsequent requests for container locations will reply with that single + * pipeline, using all registered nodes. + * + * This will evolve from a stub to a full-fledged implementation capable of + * partitioning the keyspace across multiple containers, with appropriate + * distribution across nodes. */ @InterfaceAudience.Private public class StorageContainerManager @@ -109,6 +121,8 @@ public class StorageContainerManager private final StorageContainerNameService ns; private final BlockManager blockManager; + private final XceiverClientManager xceiverClientManager; + private Pipeline singlePipeline; /** The RPC server that listens to requests from DataNodes. */ private final RPC.Server serviceRpcServer; @@ -128,11 +142,12 @@ public class StorageContainerManager * * @param conf configuration */ - public StorageContainerManager(Configuration conf) + public StorageContainerManager(OzoneConfiguration conf) throws IOException { ns = new StorageContainerNameService(); boolean haEnabled = false; blockManager = new BlockManager(ns, haEnabled, conf); + xceiverClientManager = new XceiverClientManager(conf); RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, ProtobufRpcEngine.class); @@ -193,20 +208,20 @@ public StorageContainerManager(Configuration conf) public Set getStorageContainerLocations(Set keys) throws IOException { LOG.trace("getStorageContainerLocations keys = {}", keys); + Pipeline pipeline = initSingleContainerPipeline(); List liveNodes = new ArrayList(); blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); if (liveNodes.isEmpty()) { throw new IOException("Storage container locations not found."); } - String containerName = UUID.randomUUID().toString(); Set locations = Sets.newLinkedHashSet(liveNodes); DatanodeInfo leader = liveNodes.get(0); Set locatedContainers = Sets.newLinkedHashSetWithExpectedSize(keys.size()); for (String key: keys) { - locatedContainers.add(new LocatedContainer(key, key, containerName, - locations, leader)); + locatedContainers.add(new LocatedContainer(key, key, + pipeline.getContainerName(), locations, leader)); } LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}", keys, locatedContainers); @@ -415,6 +430,56 @@ public void join() { } } + /** + * Lazily initializes a single container pipeline using all registered + * DataNodes via a synchronous call to the container protocol. This single + * container pipeline will be reused for container requests for the lifetime + * of this StorageContainerManager. + * + * @throws IOException if there is an I/O error + */ + private synchronized Pipeline initSingleContainerPipeline() + throws IOException { + if (singlePipeline == null) { + List liveNodes = new ArrayList(); + blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); + if (liveNodes.isEmpty()) { + throw new IOException("Storage container locations not found."); + } + Pipeline newPipeline = newPipelineFromNodes(liveNodes, + UUID.randomUUID().toString()); + XceiverClient xceiverClient = + xceiverClientManager.acquireClient(newPipeline); + try { + ContainerData containerData = ContainerData + .newBuilder() + .setName(newPipeline.getContainerName()) + .build(); + CreateContainerRequestProto createContainerRequest = + CreateContainerRequestProto.newBuilder() + .setPipeline(newPipeline.getProtobufMessage()) + .setContainerData(containerData) + .build(); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.CreateContainer) + .setCreateContainer(createContainerRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand( + request); + Result result = response.getResult(); + if (result != Result.SUCCESS) { + throw new IOException( + "Failed to initialize container due to result code: " + result); + } + singlePipeline = newPipeline; + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } + } + return singlePipeline; + } + /** * Builds a message for logging startup information about an RPC server. * @@ -429,6 +494,25 @@ private static String buildRpcServerStartMessage(String description, String.format("%s not started", description); } + /** + * Translates a list of nodes, ordered such that the first is the leader, into + * a corresponding {@link Pipeline} object. + * + * @param nodes list of nodes + * @param containerName container name + * @return pipeline corresponding to nodes + */ + private static Pipeline newPipelineFromNodes(List nodes, + String containerName) { + String leaderId = nodes.get(0).getDatanodeUuid(); + Pipeline pipeline = new Pipeline(leaderId); + for (DatanodeDescriptor node : nodes) { + pipeline.addMember(node); + } + pipeline.setContainerName(containerName); + return pipeline; + } + /** * Starts an RPC server, if configured. * @@ -443,7 +527,7 @@ private static String buildRpcServerStartMessage(String description, * @return RPC server, or null if addr is null * @throws IOException if there is an I/O error while creating RPC server */ - private static RPC.Server startRpcServer(Configuration conf, + private static RPC.Server startRpcServer(OzoneConfiguration conf, InetSocketAddress addr, Class protocol, BlockingService instance, String bindHostKey, String handlerCountKey, int handlerCountDefault) throws IOException { @@ -480,7 +564,7 @@ private static RPC.Server startRpcServer(Configuration conf, * @param rpcServer started RPC server. If null, then the server was not * started, and this method is a no-op. */ - private static InetSocketAddress updateListenAddress(Configuration conf, + private static InetSocketAddress updateListenAddress(OzoneConfiguration conf, String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { if (rpcServer == null) { return null; @@ -502,7 +586,7 @@ public static void main(String[] argv) throws IOException { StringUtils.startupShutdownMessage( StorageContainerManager.class, argv, LOG); StorageContainerManager scm = new StorageContainerManager( - new Configuration()); + new OzoneConfiguration()); scm.start(); scm.join(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java index 4df303a1688..3441bf9f9be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java @@ -174,9 +174,12 @@ public void putKey(String keyName, String data) throws OzoneException { InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING)); putRequest.setEntity(new InputStreamEntity(is, data.length())); - putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is)); - putRequest - .setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length())); + is.mark(data.length()); + try { + putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is)); + } finally { + is.reset(); + } executePutKey(putRequest, httpClient); } catch (IOException | URISyntaxException ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java index 2bd4a69fb45..c2e64daa282 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/exceptions/OzoneException.java @@ -92,7 +92,7 @@ public OzoneException(long httpCode, String shortMessage) { */ public OzoneException(long httpCode, String shortMessage, String message) { this.shortMessage = shortMessage; - this.resource = message; + this.message = message; this.httpCode = httpCode; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java index ca6ddeb48f3..501b2391dfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/request/OzoneQuota.java @@ -77,6 +77,16 @@ public OzoneQuota(int size, Units unit) { this.unit = unit; } + /** + * Formats a quota as a string. + * + * @param quota the quota to format + * @return string representation of quota + */ + public static String formatQuota(OzoneQuota quota) { + return String.valueOf(quota.size) + quota.unit; + } + /** * Parses a user provided string and returns the * Quota Object. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java new file mode 100644 index 00000000000..166e71c1ec4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkInputStream.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.UserArgs; + +/** + * An {@link InputStream} used by the REST service in combination with the + * {@link DistributedStorageHandler} to read the value of a key from a sequence + * of container chunks. All bytes of the key value are stored in container + * chunks. Each chunk may contain multiple underlying {@link ByteBuffer} + * instances. This class encapsulates all state management for iterating + * through the sequence of chunks and the sequence of buffers within each chunk. + */ +class ChunkInputStream extends InputStream { + + private static final int EOF = -1; + + private final String key; + private final UserArgs args; + private XceiverClientManager xceiverClientManager; + private XceiverClient xceiverClient; + private List chunks; + private int chunkOffset; + private List buffers; + private int bufferOffset; + + /** + * Creates a new ChunkInputStream. + * + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param chunks list of chunks to read + * @param args container protocol call args + */ + public ChunkInputStream(String key, XceiverClientManager xceiverClientManager, + XceiverClient xceiverClient, List chunks, UserArgs args) { + this.key = key; + this.args = args; + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.chunks = chunks; + this.chunkOffset = 0; + this.buffers = null; + this.bufferOffset = 0; + } + + @Override + public synchronized int read() + throws IOException { + checkOpen(); + int available = prepareRead(1); + return available == EOF ? EOF : buffers.get(bufferOffset).get(); + } + + @Override + public synchronized int read(byte[] b, int off, int len) throws IOException { + // According to the JavaDocs for InputStream, it is recommended that + // subclasses provide an override of bulk read if possible for performance + // reasons. In addition to performance, we need to do it for correctness + // reasons. The Ozone REST service uses PipedInputStream and + // PipedOutputStream to relay HTTP response data between a Jersey thread and + // a Netty thread. It turns out that PipedInputStream/PipedOutputStream + // have a subtle dependency (bug?) on the wrapped stream providing separate + // implementations of single-byte read and bulk read. Without this, get key + // responses might close the connection before writing all of the bytes + // advertised in the Content-Length. + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (len == 0) { + return 0; + } + checkOpen(); + int available = prepareRead(len); + if (available == EOF) { + return EOF; + } + buffers.get(bufferOffset).get(b, off, available); + return available; + } + + @Override + public synchronized void close() { + if (xceiverClientManager != null && xceiverClient != null) { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + } + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkInputStream has been closed."); + } + } + + /** + * Prepares to read by advancing through chunks and buffers as needed until it + * finds data to return or encounters EOF. + * + * @param len desired length of data to read + * @return length of data available to read, possibly less than desired length + */ + private synchronized int prepareRead(int len) throws IOException { + for (;;) { + if (chunks == null || chunks.isEmpty()) { + // This must be an empty key. + return EOF; + } else if (buffers == null) { + // The first read triggers fetching the first chunk. + readChunkFromContainer(0); + } else if (!buffers.isEmpty() && + buffers.get(bufferOffset).hasRemaining()) { + // Data is available from the current buffer. + ByteBuffer bb = buffers.get(bufferOffset); + return len > bb.remaining() ? bb.remaining() : len; + } else if (!buffers.isEmpty() && + !buffers.get(bufferOffset).hasRemaining() && + bufferOffset < buffers.size() - 1) { + // There are additional buffers available. + ++bufferOffset; + } else if (chunkOffset < chunks.size() - 1) { + // There are additional chunks available. + readChunkFromContainer(chunkOffset + 1); + } else { + // All available input has been consumed. + return EOF; + } + } + } + + /** + * Attempts to read the chunk at the specified offset in the chunk list. If + * successful, then the data of the read chunk is saved so that its bytes can + * be returned from subsequent read calls. + * + * @param readChunkOffset offset in the chunk list of which chunk to read + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void readChunkFromContainer(int readChunkOffset) + throws IOException { + final ReadChunkResponseProto readChunkResponse; + try { + readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset), + key, args); + } catch (OzoneException e) { + throw new IOException("Unexpected OzoneException", e); + } + chunkOffset = readChunkOffset; + ByteString byteString = readChunkResponse.getData(); + buffers = byteString.asReadOnlyByteBufferList(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java new file mode 100644 index 00000000000..d4e639fd6fd --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkOutputStream.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE; +import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.UUID; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.UserArgs; +import org.apache.hadoop.ozone.web.response.KeyInfo; + +/** + * An {@link OutputStream} used by the REST service in combination with the + * {@link DistributedStorageHandler} to write the value of a key to a sequence + * of container chunks. Writes are buffered locally and periodically written to + * the container as a new chunk. In order to preserve the semantics that + * replacement of a pre-existing key is atomic, each instance of the stream has + * an internal unique identifier. This unique identifier and a monotonically + * increasing chunk index form a composite key that is used as the chunk name. + * After all data is written, a putKey call creates or updates the corresponding + * container key, and this call includes the full list of chunks that make up + * the key data. The list of chunks is updated all at once. Therefore, a + * concurrent reader never can see an intermediate state in which different + * chunks of data from different versions of the key data are interleaved. + * This class encapsulates all state management for buffering and writing + * through to the container. + */ +class ChunkOutputStream extends OutputStream { + + private final String containerKey; + private final KeyInfo key; + private final UserArgs args; + private final KeyData.Builder containerKeyData; + private XceiverClientManager xceiverClientManager; + private XceiverClient xceiverClient; + private ByteBuffer buffer; + private final String streamId; + private int chunkIndex; + + /** + * Creates a new ChunkOutputStream. + * + * @param containerKey container key + * @param key chunk key + * @param xceiverClientManager client manager that controls client + * @param xceiverClient client to perform container calls + * @param args container protocol call args + */ + public ChunkOutputStream(String containerKey, KeyInfo key, + XceiverClientManager xceiverClientManager, XceiverClient xceiverClient, + UserArgs args) { + this.containerKey = containerKey; + this.key = key; + this.args = args; + this.containerKeyData = fromKeyToContainerKeyDataBuilder( + xceiverClient.getPipeline().getContainerName(), containerKey, key); + this.xceiverClientManager = xceiverClientManager; + this.xceiverClient = xceiverClient; + this.buffer = ByteBuffer.allocate(CHUNK_SIZE); + this.streamId = UUID.randomUUID().toString(); + this.chunkIndex = 0; + } + + @Override + public synchronized void write(int b) throws IOException { + checkOpen(); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put((byte)b); + if (buffer.position() == CHUNK_SIZE) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public synchronized void flush() throws IOException { + checkOpen(); + if (buffer.position() > 0) { + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + flushBufferToChunk(rollbackPosition, rollbackLimit); + } + } + + @Override + public synchronized void close() throws IOException { + if (xceiverClientManager != null && xceiverClient != null && + buffer != null) { + try { + if (buffer.position() > 0) { + writeChunkToContainer(); + } + putKey(xceiverClient, containerKeyData.build(), args); + } catch (OzoneException e) { + throw new IOException("Unexpected OzoneException", e); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + xceiverClientManager = null; + xceiverClient = null; + buffer = null; + } + } + + } + + /** + * Checks if the stream is open. If not, throws an exception. + * + * @throws IOException if stream is closed + */ + private synchronized void checkOpen() throws IOException { + if (xceiverClient == null) { + throw new IOException("ChunkOutputStream has been closed."); + } + } + + /** + * Attempts to flush buffered writes by writing a new chunk to the container. + * If successful, then clears the buffer to prepare to receive writes for a + * new chunk. + * + * @param rollbackPosition position to restore in buffer if write fails + * @param rollbackLimit limit to restore in buffer if write fails + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void flushBufferToChunk(int rollbackPosition, + int rollbackLimit) throws IOException { + boolean success = false; + try { + writeChunkToContainer(); + success = true; + } finally { + if (success) { + buffer.clear(); + } else { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + } + } + } + + /** + * Writes buffered data as a new chunk to the container and saves chunk + * information to be used later in putKey call. + * + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void writeChunkToContainer() throws IOException { + buffer.flip(); + ByteString data = ByteString.copyFrom(buffer); + ChunkInfo chunk = ChunkInfo + .newBuilder() + .setChunkName( + key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex) + .setOffset(0) + .setLen(data.size()) + .build(); + try { + writeChunk(xceiverClient, chunk, key.getKeyName(), data, args); + } catch (OzoneException e) { + throw new IOException("Unexpected OzoneException", e); + } + containerKeyData.addChunks(chunk); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java new file mode 100644 index 00000000000..4cb3ab955b3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ContainerProtocolCalls.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; + +import java.io.IOException; + +import com.google.protobuf.ByteString; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.web.exceptions.ErrorTable; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.handlers.UserArgs; + +/** + * Implementation of all container protocol calls performed by + * {@link DistributedStorageHandler}. + */ +final class ContainerProtocolCalls { + + /** + * Calls the container protocol to get a container key. + * + * @param xceiverClient client to perform call + * @param containerKeyData key data to identify container + * @param args container protocol call args + * @returns container protocol get key response + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static GetKeyResponseProto getKey(XceiverClient xceiverClient, + KeyData containerKeyData, UserArgs args) throws IOException, + OzoneException { + GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyData(containerKeyData); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.GetKey) + .setTraceID(args.getRequestID()) + .setGetKey(readKeyRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + return response.getGetKey(); + } + + /** + * Calls the container protocol to put a container key. + * + * @param xceiverClient client to perform call + * @param containerKeyData key data to identify container + * @param args container protocol call args + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static void putKey(XceiverClient xceiverClient, + KeyData containerKeyData, UserArgs args) throws IOException, + OzoneException { + PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyData(containerKeyData); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.PutKey) + .setTraceID(args.getRequestID()) + .setPutKey(createKeyRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + } + + /** + * Calls the container protocol to read a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to read + * @param key the key name + * @param args container protocol call args + * @returns container protocol read chunk response + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient, + ChunkInfo chunk, String key, UserArgs args) + throws IOException, OzoneException { + ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyName(key) + .setChunkData(chunk); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.ReadChunk) + .setTraceID(args.getRequestID()) + .setReadChunk(readChunkRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + return response.getReadChunk(); + } + + /** + * Calls the container protocol to write a chunk. + * + * @param xceiverClient client to perform call + * @param chunk information about chunk to write + * @param key the key name + * @param data the data of the chunk to write + * @param args container protocol call args + * @throws IOException if there is an I/O error while performing the call + * @throws OzoneException if the container protocol call failed + */ + public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk, + String key, ByteString data, UserArgs args) + throws IOException, OzoneException { + WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto + .newBuilder() + .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) + .setKeyName(key) + .setChunkData(chunk) + .setData(data); + ContainerCommandRequestProto request = ContainerCommandRequestProto + .newBuilder() + .setCmdType(Type.WriteChunk) + .setTraceID(args.getRequestID()) + .setWriteChunk(writeChunkRequest) + .build(); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response, args); + } + + /** + * Validates a response from a container protocol call. Any non-successful + * return code is mapped to a corresponding exception and thrown. + * + * @param response container protocol call response + * @param args container protocol call args + * @throws OzoneException if the container protocol call failed + */ + private static void validateContainerResponse( + ContainerCommandResponseProto response, UserArgs args) + throws OzoneException { + switch (response.getResult()) { + case SUCCESS: + break; + case MALFORMED_REQUEST: + throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST, + "badRequest", "Bad container request."), args); + case UNSUPPORTED_REQUEST: + throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, + "internalServerError", "Unsupported container request."), args); + case CONTAINER_INTERNAL_ERROR: + throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, + "internalServerError", "Container internal error."), args); + default: + throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR, + "internalServerError", "Unrecognized container response."), args); + } + } + + /** + * There is no need to instantiate this class. + */ + private ContainerProtocolCalls() { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 9cb8430c481..8d4868ce297 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -18,7 +18,32 @@ package org.apache.hadoop.ozone.web.storage; +import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*; +import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*; + +import java.io.IOException; +import java.io.OutputStream; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.TimeZone; + +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient; +import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager; +import org.apache.hadoop.ozone.protocol.LocatedContainer; +import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.handlers.BucketArgs; import org.apache.hadoop.ozone.web.handlers.KeyArgs; @@ -27,13 +52,13 @@ import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.response.BucketInfo; +import org.apache.hadoop.ozone.web.response.KeyInfo; import org.apache.hadoop.ozone.web.response.ListBuckets; import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.VolumeInfo; - -import java.io.IOException; -import java.io.OutputStream; +import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.util.StringUtils; /** * A {@link StorageHandler} implementation that distributes object storage @@ -41,156 +66,283 @@ */ public final class DistributedStorageHandler implements StorageHandler { - @Override - public void createVolume(VolumeArgs args) throws - IOException, OzoneException { + private final StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocation; + private final XceiverClientManager xceiverClientManager; + /** + * Creates a new DistributedStorageHandler. + * + * @param conf configuration + * @param storageContainerLocation StorageContainerLocationProtocol proxy + */ + public DistributedStorageHandler(OzoneConfiguration conf, + StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocation) { + this.storageContainerLocation = storageContainerLocation; + this.xceiverClientManager = new XceiverClientManager(conf); + } + + @Override + public void createVolume(VolumeArgs args) throws IOException, OzoneException { + String containerKey = buildContainerKey(args.getVolumeName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + VolumeInfo volume = new VolumeInfo(); + volume.setVolumeName(args.getVolumeName()); + volume.setQuota(args.getQuota()); + volume.setOwner(new VolumeOwner(args.getUserName())); + volume.setCreatedOn(dateToString(new Date())); + volume.setCreatedBy(args.getAdminName()); + KeyData containerKeyData = fromVolumeToContainerKeyData( + xceiverClient.getPipeline().getContainerName(), containerKey, volume); + putKey(xceiverClient, containerKeyData, args); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } @Override public void setVolumeOwner(VolumeArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("setVolumeOwner not implemented"); } @Override public void setVolumeQuota(VolumeArgs args, boolean remove) throws IOException, OzoneException { - + throw new UnsupportedOperationException("setVolumeQuota not implemented"); } @Override public boolean checkVolumeAccess(VolumeArgs args) throws IOException, OzoneException { - return false; + throw new UnsupportedOperationException("checkVolumeAccessnot implemented"); } @Override public ListVolumes listVolumes(UserArgs args) throws IOException, OzoneException { - return null; + throw new UnsupportedOperationException("listVolumes not implemented"); } @Override public void deleteVolume(VolumeArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("deleteVolume not implemented"); } @Override public VolumeInfo getVolumeInfo(VolumeArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + KeyData containerKeyData = containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, + args); + return fromContainerKeyValueListToVolume( + response.getKeyData().getMetadataList()); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } @Override - public void createBucket(BucketArgs args) + public void createBucket(final BucketArgs args) throws IOException, OzoneException { - + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + BucketInfo bucket = new BucketInfo(); + bucket.setVolumeName(args.getVolumeName()); + bucket.setBucketName(args.getBucketName()); + bucket.setAcls(args.getAddAcls()); + bucket.setVersioning(args.getVersioning()); + bucket.setStorageType(args.getStorageType()); + KeyData containerKeyData = fromBucketToContainerKeyData( + xceiverClient.getPipeline().getContainerName(), containerKey, bucket); + putKey(xceiverClient, containerKeyData, args); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } @Override public void setBucketAcls(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("setBucketAcls not implemented"); } @Override public void setBucketVersioning(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException( + "setBucketVersioning not implemented"); } @Override public void setBucketStorageClass(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException( + "setBucketStorageClass not implemented"); } @Override public void deleteBucket(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException("deleteBucket not implemented"); } @Override public void checkBucketAccess(BucketArgs args) throws IOException, OzoneException { - + throw new UnsupportedOperationException( + "checkBucketAccess not implemented"); } @Override public ListBuckets listBuckets(VolumeArgs args) throws IOException, OzoneException { - return null; + throw new UnsupportedOperationException("listBuckets not implemented"); } @Override public BucketInfo getBucketInfo(BucketArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + try { + KeyData containerKeyData = containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, + args); + return fromContainerKeyValueListToBucket( + response.getKeyData().getMetadataList()); + } finally { + xceiverClientManager.releaseClient(xceiverClient); + } } - /** - * Writes a key in an existing bucket. - * - * @param args KeyArgs - * @return InputStream - * @throws OzoneException - */ @Override public OutputStream newKeyWriter(KeyArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName(), args.getKeyName()); + KeyInfo key = new KeyInfo(); + key.setKeyName(args.getKeyName()); + key.setCreatedOn(dateToString(new Date())); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + return new ChunkOutputStream(containerKey, key, xceiverClientManager, + xceiverClient, args); } - /** - * Tells the file system that the object has been written out completely and - * it can do any house keeping operation that needs to be done. - * - * @param args Key Args - * @param stream - * @throws IOException - */ @Override public void commitKey(KeyArgs args, OutputStream stream) throws IOException, OzoneException { - + stream.close(); } - /** - * Reads a key from an existing bucket. - * - * @param args KeyArgs - * @return LengthInputStream - * @throws IOException - */ @Override public LengthInputStream newKeyReader(KeyArgs args) throws IOException, OzoneException { - return null; + String containerKey = buildContainerKey(args.getVolumeName(), + args.getBucketName(), args.getKeyName()); + XceiverClient xceiverClient = acquireXceiverClient(containerKey); + boolean success = false; + try { + KeyData containerKeyData = containerKeyDataForRead( + xceiverClient.getPipeline().getContainerName(), containerKey); + GetKeyResponseProto response = getKey(xceiverClient, containerKeyData, + args); + long length = 0; + List chunks = response.getKeyData().getChunksList(); + for (ChunkInfo chunk : chunks) { + length += chunk.getLen(); + } + success = true; + return new LengthInputStream(new ChunkInputStream( + containerKey, xceiverClientManager, xceiverClient, chunks, args), + length); + } finally { + if (!success) { + xceiverClientManager.releaseClient(xceiverClient); + } + } } - /** - * Deletes an existing key. - * - * @param args KeyArgs - * @throws OzoneException - */ @Override public void deleteKey(KeyArgs args) throws IOException, OzoneException { + throw new UnsupportedOperationException("deleteKey not implemented"); + } + @Override + public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { + throw new UnsupportedOperationException("listKeys not implemented"); } /** - * Returns a list of Key. + * Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes + * capable of serving container protocol operations. The container is + * selected based on the specified container key. * - * @param args KeyArgs - * @return BucketList - * @throws IOException + * @param containerKey container key + * @return XceiverClient connected to a container + * @throws IOException if an XceiverClient cannot be acquired */ - @Override - public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { - return null; + private XceiverClient acquireXceiverClient(String containerKey) + throws IOException { + Set locatedContainers = + storageContainerLocation.getStorageContainerLocations( + new HashSet<>(Arrays.asList(containerKey))); + Pipeline pipeline = newPipelineFromLocatedContainer( + locatedContainers.iterator().next()); + return xceiverClientManager.acquireClient(pipeline); + } + + /** + * Creates a container key from any number of components by combining all + * components with a delimiter. + * + * @param parts container key components + * @return container key + */ + private static String buildContainerKey(String... parts) { + return '/' + StringUtils.join('/', parts); + } + + /** + * Formats a date in the expected string format. + * + * @param date the date to format + * @return formatted string representation of date + */ + private static String dateToString(Date date) { + SimpleDateFormat sdf = + new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE)); + return sdf.format(date); + } + + /** + * Translates a set of container locations, ordered such that the first is the + * leader, into a corresponding {@link Pipeline} object. + * + * @param locatedContainer container location + * @return pipeline corresponding to container locations + */ + private static Pipeline newPipelineFromLocatedContainer( + LocatedContainer locatedContainer) { + Set locations = locatedContainer.getLocations(); + String leaderId = locations.iterator().next().getDatanodeUuid(); + Pipeline pipeline = new Pipeline(leaderId); + for (DatanodeInfo location : locations) { + pipeline.addMember(location); + } + pipeline.setContainerName(locatedContainer.getContainerName()); + return pipeline; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java new file mode 100644 index 00000000000..9333fe6d594 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/OzoneContainerTranslation.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web.storage; + +import java.util.List; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.ozone.OzoneConsts.Versioning; +import org.apache.hadoop.ozone.web.request.OzoneQuota; +import org.apache.hadoop.ozone.web.response.BucketInfo; +import org.apache.hadoop.ozone.web.response.KeyInfo; +import org.apache.hadoop.ozone.web.response.VolumeInfo; +import org.apache.hadoop.ozone.web.response.VolumeOwner; +import org.apache.hadoop.util.StringUtils; + +/** + * This class contains methods that define the translation between the Ozone + * domain model and the storage container domain model. + */ +final class OzoneContainerTranslation { + + private static final String ACLS = "ACLS"; + private static final String BUCKET = "BUCKET"; + private static final String BUCKET_NAME = "BUCKET_NAME"; + private static final String CREATED_BY = "CREATED_BY"; + private static final String CREATED_ON = "CREATED_ON"; + private static final String KEY = "KEY"; + private static final String OWNER = "OWNER"; + private static final String QUOTA = "QUOTA"; + private static final String STORAGE_TYPE = "STORAGE_TYPE"; + private static final String TYPE = "TYPE"; + private static final String VERSIONING = "VERSIONING"; + private static final String VOLUME = "VOLUME"; + private static final String VOLUME_NAME = "VOLUME_NAME"; + + /** + * Creates key data intended for reading a container key. + * + * @param containerName container name + * @param containerKey container key + * @return KeyData intended for reading the container key + */ + public static KeyData containerKeyDataForRead(String containerName, + String containerKey) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .build(); + } + + /** + * Translates a bucket to its container representation. + * + * @param containerName container name + * @param containerKey container key + * @param bucket the bucket to translate + * @return KeyData representation of bucket + */ + public static KeyData fromBucketToContainerKeyData( + String containerName, String containerKey, BucketInfo bucket) { + KeyData.Builder containerKeyData = KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, BUCKET)) + .addMetadata(newKeyValue(VOLUME_NAME, bucket.getVolumeName())) + .addMetadata(newKeyValue(BUCKET_NAME, bucket.getBucketName())); + + if (bucket.getAcls() != null) { + containerKeyData.addMetadata(newKeyValue(ACLS, + StringUtils.join(',', bucket.getAcls()))); + } + + if (bucket.getVersioning() != null && + bucket.getVersioning() != Versioning.NOT_DEFINED) { + containerKeyData.addMetadata(newKeyValue(VERSIONING, + bucket.getVersioning().name())); + } + + if (bucket.getStorageType() != StorageType.RAM_DISK) { + containerKeyData.addMetadata(newKeyValue(STORAGE_TYPE, + bucket.getStorageType().name())); + } + + return containerKeyData.build(); + } + + /** + * Translates a bucket from its container representation. + * + * @param metadata container metadata representing the bucket + * @return bucket translated from container representation + */ + public static BucketInfo fromContainerKeyValueListToBucket( + List metadata) { + BucketInfo bucket = new BucketInfo(); + for (KeyValue keyValue : metadata) { + switch (keyValue.getKey()) { + case VOLUME_NAME: + bucket.setVolumeName(keyValue.getValue()); + break; + case BUCKET_NAME: + bucket.setBucketName(keyValue.getValue()); + break; + case VERSIONING: + bucket.setVersioning( + Enum.valueOf(Versioning.class, keyValue.getValue())); + break; + case STORAGE_TYPE: + bucket.setStorageType( + Enum.valueOf(StorageType.class, keyValue.getValue())); + break; + default: + break; + } + } + return bucket; + } + + /** + * Translates a volume from its container representation. + * + * @param metadata container metadata representing the volume + * @return volume translated from container representation + */ + public static VolumeInfo fromContainerKeyValueListToVolume( + List metadata) { + VolumeInfo volume = new VolumeInfo(); + for (KeyValue keyValue : metadata) { + switch (keyValue.getKey()) { + case VOLUME_NAME: + volume.setVolumeName(keyValue.getValue()); + break; + case CREATED_BY: + volume.setCreatedBy(keyValue.getValue()); + break; + case CREATED_ON: + volume.setCreatedOn(keyValue.getValue()); + break; + case OWNER: + volume.setOwner(new VolumeOwner(keyValue.getValue())); + break; + case QUOTA: + volume.setQuota(OzoneQuota.parseQuota(keyValue.getValue())); + break; + default: + break; + } + } + return volume; + } + + /** + * Translates a key to its container representation. + * + * @param containerName container name + * @param containerKey container key + * @param keyInfo key information received from call + * @return KeyData intended for reading the container key + */ + public static KeyData fromKeyToContainerKeyData(String containerName, + String containerKey, KeyInfo key) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, KEY)) + .build(); + } + + /** + * Translates a key to its container representation. The return value is a + * builder that can be manipulated further before building the result. + * + * @param containerName container name + * @param containerKey container key + * @param keyInfo key information received from call + * @return KeyData builder + */ + public static KeyData.Builder fromKeyToContainerKeyDataBuilder( + String containerName, String containerKey, KeyInfo key) { + return KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, KEY)); + } + + /** + * Translates a volume to its container representation. + * + * @param containerName container name + * @param containerKey container key + * @param volume the volume to translate + * @return KeyData representation of volume + */ + public static KeyData fromVolumeToContainerKeyData( + String containerName, String containerKey, VolumeInfo volume) { + KeyData.Builder containerKeyData = KeyData + .newBuilder() + .setContainerName(containerName) + .setName(containerKey) + .addMetadata(newKeyValue(TYPE, VOLUME)) + .addMetadata(newKeyValue(VOLUME_NAME, volume.getVolumeName())) + .addMetadata(newKeyValue(CREATED_ON, volume.getCreatedOn())); + + if (volume.getQuota() != null && volume.getQuota().sizeInBytes() != -1L) { + containerKeyData.addMetadata(newKeyValue(QUOTA, + OzoneQuota.formatQuota(volume.getQuota()))); + } + + if (volume.getOwner() != null && volume.getOwner().getName() != null && + !volume.getOwner().getName().isEmpty()) { + containerKeyData.addMetadata(newKeyValue(OWNER, + volume.getOwner().getName())); + } + + if (volume.getCreatedBy() != null && !volume.getCreatedBy().isEmpty()) { + containerKeyData.addMetadata( + newKeyValue(CREATED_BY, volume.getCreatedBy())); + } + + return containerKeyData.build(); + } + + /** + * Translates a key-value pair to its container representation. + * + * @param key the key + * @param value the value + * @return container representation of key-value pair + */ + private static KeyValue newKeyValue(String key, Object value) { + return KeyValue.newBuilder().setKey(key).setValue(value.toString()).build(); + } + + /** + * There is no need to instantiate this class. + */ + private OzoneContainerTranslation() { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 218058c4ec9..da6d9c3ccb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -23,6 +23,10 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.util.Random; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +41,8 @@ import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.storage.StorageContainerManager; +import org.apache.hadoop.ozone.web.client.OzoneClient; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; @@ -53,6 +59,8 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(MiniOzoneCluster.class); + private static final String USER_AUTH = "hdfs"; + private final OzoneConfiguration conf; private final StorageContainerManager scm; @@ -125,6 +133,52 @@ public void shutdown() { scm.join(); } + /** + * Creates an {@link OzoneClient} connected to this cluster's REST service. + * Callers take ownership of the client and must close it when done. + * + * @return OzoneClient connected to this cluster's REST service + * @throws OzoneException if Ozone encounters an error creating the client + */ + public OzoneClient createOzoneClient() throws OzoneException { + Preconditions.checkState(!getDataNodes().isEmpty(), + "Cannot create OzoneClient if the cluster has no DataNodes."); + // An Ozone request may originate at any DataNode, so pick one at random. + int dnIndex = new Random().nextInt(getDataNodes().size()); + String uri = String.format("http://127.0.0.1:%d", + getDataNodes().get(dnIndex).getInfoPort()); + LOG.info("Creating Ozone client to DataNode {} with URI {} and user {}", + dnIndex, uri, USER_AUTH); + try { + return new OzoneClient(uri, USER_AUTH); + } catch (URISyntaxException e) { + // We control the REST service URI, so it should never be invalid. + throw new IllegalStateException("Unexpected URISyntaxException", e); + } + } + + /** + * Creates an RPC proxy connected to this cluster's StorageContainerManager + * for accessing container location information. Callers take ownership of + * the proxy and must close it when done. + * + * @return RPC proxy for accessing container location information + * @throws IOException if there is an I/O error + */ + public StorageContainerLocationProtocolClientSideTranslatorPB + createStorageContainerLocationClient() throws IOException { + long version = RPC.getProtocolVersion( + StorageContainerLocationProtocolPB.class); + InetSocketAddress address = scm.getStorageContainerLocationRpcAddress(); + LOG.info( + "Creating StorageContainerLocationProtocol RPC client with address {}", + address); + return new StorageContainerLocationProtocolClientSideTranslatorPB( + RPC.getProxy(StorageContainerLocationProtocolPB.class, version, + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf))); + } + /** * Waits for the Ozone cluster to be ready for processing requests. */ @@ -146,23 +200,4 @@ public void waitOzoneReady() { } } } - - /** - * Creates an RPC proxy connected to this cluster's StorageContainerManager - * for accessing container location information. Callers take ownership of - * the proxy and must close it when done. - * - * @return RPC proxy for accessing container location information - * @throws IOException if there is an I/O error - */ - protected StorageContainerLocationProtocolClientSideTranslatorPB - createStorageContainerLocationClient() throws IOException { - long version = RPC.getProtocolVersion( - StorageContainerLocationProtocolPB.class); - InetSocketAddress address = scm.getStorageContainerLocationRpcAddress(); - return new StorageContainerLocationProtocolClientSideTranslatorPB( - RPC.getProxy(StorageContainerLocationProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf))); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java new file mode 100644 index 00000000000..315f59d21a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/TestOzoneRestWithMiniCluster.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.web; + +import static com.google.common.base.Charsets.UTF_8; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE; +import static org.junit.Assert.*; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.web.client.OzoneBucket; +import org.apache.hadoop.ozone.web.client.OzoneClient; +import org.apache.hadoop.ozone.web.client.OzoneVolume; +import org.apache.hadoop.ozone.web.request.OzoneQuota; + +/** + * End-to-end testing of Ozone REST operations. + */ +public class TestOzoneRestWithMiniCluster { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static int idSuffix; + private static OzoneClient ozoneClient; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "distributed"); + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true); + cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitOzoneReady(); + ozoneClient = cluster.createOzoneClient(); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + IOUtils.cleanup(null, ozoneClient, cluster); + } + + @Test + public void testCreateAndGetVolume() throws Exception { + String volumeName = nextId("volume"); + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + volume = ozoneClient.getVolume(volumeName); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + } + + @Test + public void testCreateAndGetBucket() throws Exception { + String volumeName = nextId("volume"); + String bucketName = nextId("bucket"); + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + OzoneBucket bucket = volume.createBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + bucket = volume.getBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + } + + @Test + public void testPutAndGetKey() throws Exception { + String volumeName = nextId("volume"); + String bucketName = nextId("bucket"); + String keyName = nextId("key"); + String keyData = nextId("data"); + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + OzoneBucket bucket = volume.createBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + } + + @Test + public void testPutAndGetEmptyKey() throws Exception { + String volumeName = nextId("volume"); + String bucketName = nextId("bucket"); + String keyName = nextId("key"); + String keyData = ""; + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + OzoneBucket bucket = volume.createBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + } + + @Test + public void testPutAndGetMultiChunkKey() throws Exception { + String volumeName = nextId("volume"); + String bucketName = nextId("bucket"); + String keyName = nextId("key"); + int keyDataLen = 3 * CHUNK_SIZE; + String keyData = buildKeyData(keyDataLen); + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + OzoneBucket bucket = volume.createBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + } + + @Test + public void testPutAndGetMultiChunkKeyLastChunkPartial() throws Exception { + String volumeName = nextId("volume"); + String bucketName = nextId("bucket"); + String keyName = nextId("key"); + int keyDataLen = (int)(2.5 * CHUNK_SIZE); + String keyData = buildKeyData(keyDataLen); + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + OzoneBucket bucket = volume.createBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + } + + @Test + public void testReplaceKey() throws Exception { + String volumeName = nextId("volume"); + String bucketName = nextId("bucket"); + String keyName = nextId("key"); + int keyDataLen = (int)(2.5 * CHUNK_SIZE); + String keyData = buildKeyData(keyDataLen); + OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB"); + assertNotNull(volume); + assertEquals(volumeName, volume.getVolumeName()); + assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby()); + assertEquals("bilbo", volume.getOwnerName()); + assertNotNull(volume.getQuota()); + assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(), + volume.getQuota().sizeInBytes()); + OzoneBucket bucket = volume.createBucket(bucketName); + assertNotNull(bucket); + assertEquals(bucketName, bucket.getBucketName()); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + + // Replace key with data consisting of fewer chunks. + keyDataLen = (int)(1.5 * CHUNK_SIZE); + keyData = buildKeyData(keyDataLen); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + + // Replace key with data consisting of more chunks. + keyDataLen = (int)(3.5 * CHUNK_SIZE); + keyData = buildKeyData(keyDataLen); + bucket.putKey(keyName, keyData); + assertEquals(keyData, bucket.getKey(keyName)); + } + + /** + * Creates sample key data of the specified length. The data is a string of + * printable ASCII characters. This makes it easy to debug through visual + * inspection of the chunk files if a test fails. + * + * @param keyDataLen desired length of key data + * @return string of printable ASCII characters of the specified length + */ + private static String buildKeyData(int keyDataLen) { + return new String(dataset(keyDataLen, 33, 93), UTF_8); + } + + /** + * Generates identifiers unique enough for use in tests, so that individual + * tests don't collide on each others' data in the shared mini-cluster. + * + * @param idPrefix prefix to put in front of ID + * @return unique ID generated by appending a suffix to the given prefix + */ + private static String nextId(String idPrefix) { + return idPrefix + ++idSuffix; + } +}