HDFS-10268. Ozone: end-to-end integration for create/get volumes, buckets and keys. Contributed by Chris Nauroth.

This commit is contained in:
Anu Engineer 2016-04-07 14:38:54 -07:00 committed by Owen O'Malley
parent 0addb1033e
commit e11e824c9b
20 changed files with 1683 additions and 107 deletions

View File

@ -17,36 +17,58 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_DEFAULT_PORT;
import static com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import com.sun.jersey.api.container.ContainerFactory; import com.sun.jersey.api.container.ContainerFactory;
import com.sun.jersey.api.core.ApplicationAdapter; import com.sun.jersey.api.core.ApplicationAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.web.handlers.ServiceFilter; import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.ObjectStoreApplication; import org.apache.hadoop.ozone.web.ObjectStoreApplication;
import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer; import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler; import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler;
import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler; import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler;
import org.apache.hadoop.security.UserGroupInformation;
/** /**
* Implements object store handling within the DataNode process. This class is * Implements object store handling within the DataNode process. This class is
* responsible for initializing and maintaining the RPC clients and servers and * responsible for initializing and maintaining the RPC clients and servers and
* the web application required for the object store implementation. * the web application required for the object store implementation.
*/ */
public final class ObjectStoreHandler { public final class ObjectStoreHandler implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
private final ObjectStoreJerseyContainer objectStoreJerseyContainer; private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
/** /**
* Creates a new ObjectStoreHandler. * Creates a new ObjectStoreHandler.
@ -57,14 +79,32 @@ public final class ObjectStoreHandler {
public ObjectStoreHandler(Configuration conf) throws IOException { public ObjectStoreHandler(Configuration conf) throws IOException {
String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY, String shType = conf.getTrimmed(DFS_STORAGE_HANDLER_TYPE_KEY,
DFS_STORAGE_HANDLER_TYPE_DEFAULT); DFS_STORAGE_HANDLER_TYPE_DEFAULT);
LOG.info("ObjectStoreHandler initializing with {}: {}",
DFS_STORAGE_HANDLER_TYPE_KEY, shType);
boolean ozoneTrace = conf.getBoolean(DFS_OBJECTSTORE_TRACE_ENABLED_KEY, boolean ozoneTrace = conf.getBoolean(DFS_OBJECTSTORE_TRACE_ENABLED_KEY,
DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT); DFS_OBJECTSTORE_TRACE_ENABLED_DEFAULT);
final StorageHandler storageHandler; final StorageHandler storageHandler;
// Initialize Jersey container for object store web application.
if ("distributed".equalsIgnoreCase(shType)) { if ("distributed".equalsIgnoreCase(shType)) {
storageHandler = new DistributedStorageHandler(); RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
long version =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress address = conf.getSocketAddr(
DFS_STORAGE_RPC_BIND_HOST_KEY, DFS_STORAGE_RPC_ADDRESS_KEY,
DFS_STORAGE_RPC_ADDRESS_DEFAULT, DFS_STORAGE_RPC_DEFAULT_PORT);
this.storageContainerLocationClient =
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
storageHandler = new DistributedStorageHandler(new OzoneConfiguration(),
this.storageContainerLocationClient);
} else { } else {
if ("local".equalsIgnoreCase(shType)) { if ("local".equalsIgnoreCase(shType)) {
storageHandler = new LocalStorageHandler(conf); storageHandler = new LocalStorageHandler(conf);
this.storageContainerLocationClient = null;
} else { } else {
throw new IllegalArgumentException( throw new IllegalArgumentException(
String.format("Unrecognized value for %s: %s", String.format("Unrecognized value for %s: %s",
@ -91,4 +131,12 @@ public final class ObjectStoreHandler {
public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() { public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() {
return this.objectStoreJerseyContainer; return this.objectStoreJerseyContainer;
} }
@Override
public void close() {
LOG.info("Closing ObjectStoreHandler.");
if (this.storageContainerLocationClient != null) {
this.storageContainerLocationClient.close();
}
}
} }

View File

@ -68,6 +68,8 @@ public final class OzoneConsts {
public static final String FILE_HASH = "SHA-256"; public static final String FILE_HASH = "SHA-256";
public final static String CHUNK_OVERWRITE = "OverWriteRequested"; public final static String CHUNK_OVERWRITE = "OverWriteRequested";
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB
/** /**
* Supports Bucket Versioning. * Supports Bucket Versioning.
*/ */

View File

@ -169,7 +169,8 @@ public final class ChunkUtils {
StandardOpenOption.SPARSE, StandardOpenOption.SPARSE,
StandardOpenOption.SYNC); StandardOpenOption.SYNC);
lock = file.lock().get(); lock = file.lock().get();
if (!chunkInfo.getChecksum().isEmpty()) { if (chunkInfo.getChecksum() != null &&
!chunkInfo.getChecksum().isEmpty()) {
verifyChecksum(chunkInfo, data, log); verifyChecksum(chunkInfo, data, log);
} }
int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get(); int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();

View File

@ -127,7 +127,7 @@ public class Dispatcher implements ContainerDispatcher {
msg.getCreateContainer().getContainerData().getName(), msg.getCreateContainer().getContainerData().getName(),
msg.getCmdType().name(), msg.getCmdType().name(),
msg.getTraceID(), msg.getTraceID(),
ex.toString()); ex.toString(), ex);
// TODO : Replace with finer error codes. // TODO : Replace with finer error codes.
return ContainerUtils.getContainerResponse(msg, return ContainerUtils.getContainerResponse(msg,
@ -169,7 +169,7 @@ public class Dispatcher implements ContainerDispatcher {
msg.getCreateContainer().getContainerData().getName(), msg.getCreateContainer().getContainerData().getName(),
msg.getCmdType().name(), msg.getCmdType().name(),
msg.getTraceID(), msg.getTraceID(),
ex.toString()); ex.toString(), ex);
// TODO : Replace with finer error codes. // TODO : Replace with finer error codes.
return ContainerUtils.getContainerResponse(msg, return ContainerUtils.getContainerResponse(msg,
@ -210,7 +210,7 @@ public class Dispatcher implements ContainerDispatcher {
msg.getCreateContainer().getContainerData().getName(), msg.getCreateContainer().getContainerData().getName(),
msg.getCmdType().name(), msg.getCmdType().name(),
msg.getTraceID(), msg.getTraceID(),
ex.toString()); ex.toString(), ex);
// TODO : Replace with finer error codes. // TODO : Replace with finer error codes.
return ContainerUtils.getContainerResponse(msg, return ContainerUtils.getContainerResponse(msg,

View File

@ -34,12 +34,13 @@ import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol.
*/ */
public class XceiverClient { public class XceiverClient implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class); static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
private final Pipeline pipeline; private final Pipeline pipeline;
private final Configuration config; private final Configuration config;
@ -92,6 +93,7 @@ public class XceiverClient {
/** /**
* Close the client. * Close the client.
*/ */
@Override
public void close() { public void close() {
if(group != null) { if(group != null) {
group.shutdownGracefully(); group.shutdownGracefully();
@ -102,6 +104,16 @@ public class XceiverClient {
} }
} }
/**
* Returns the pipeline of machines that host the container used by this
* client.
*
* @return pipeline of machines that host the container
*/
public Pipeline getPipeline() {
return pipeline;
}
/** /**
* Sends a given command to server and gets the reply back. * Sends a given command to server and gets the reply back.
* @param request Request * @param request Request

View File

@ -94,7 +94,7 @@ public class XceiverClientHandler extends
ContainerProtos.ContainerCommandResponseProto response; ContainerProtos.ContainerCommandResponseProto response;
channel.writeAndFlush(request); channel.writeAndFlush(request);
boolean interrupted = false; boolean interrupted = false;
for (; ; ) { for (;;) {
try { try {
response = responses.take(); response = responses.take();
break; break;

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.client;
import java.io.IOException;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
* instances. Callers use this class to acquire an XceiverClient instance
* connected to the desired container pipeline. When done, the caller also uses
* this class to release the previously acquired XceiverClient instance.
*
* This class may evolve to implement efficient lifecycle management policies by
* caching container location information and pooling connected client instances
* for reuse without needing to reestablish a socket connection. The current
* implementation simply allocates and closes a new instance every time.
*/
public class XceiverClientManager {
private final OzoneConfiguration conf;
/**
* Creates a new XceiverClientManager.
*
* @param conf configuration
*/
public XceiverClientManager(OzoneConfiguration conf) {
Preconditions.checkNotNull(conf);
this.conf = conf;
}
/**
* Acquires a XceiverClient connected to a container capable of storing the
* specified key.
*
* @param pipeline the container pipeline for the client connection
* @return XceiverClient connected to a container
* @throws IOException if an XceiverClient cannot be acquired
*/
public XceiverClient acquireClient(Pipeline pipeline) throws IOException {
Preconditions.checkNotNull(pipeline);
Preconditions.checkArgument(pipeline.getMachines() != null);
Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
XceiverClient xceiverClient = new XceiverClient(pipeline, conf);
try {
xceiverClient.connect();
} catch (Exception e) {
throw new IOException("Exception connecting XceiverClient.", e);
}
return xceiverClient;
}
/**
* Releases an XceiverClient after use.
*
* @param xceiverClient client to release
*/
public void releaseClient(XceiverClient xceiverClient) {
Preconditions.checkNotNull(xceiverClient);
xceiverClient.close();
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.client;
/**
* This package contains classes for the client of the storage container
* protocol.
*/

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.transport.server;
/**
* This package contains classes for the server of the storage container
* protocol.
*/

View File

@ -48,10 +48,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.CreateContainerRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -81,6 +86,10 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
import org.apache.hadoop.ozone.protocol.LocatedContainer; import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
@ -94,11 +103,14 @@ import org.apache.hadoop.util.StringUtils;
* *
* The current implementation is a stub suitable to begin end-to-end testing of * The current implementation is a stub suitable to begin end-to-end testing of
* Ozone service interactions. DataNodes report to StorageContainerManager * Ozone service interactions. DataNodes report to StorageContainerManager
* using the existing heartbeat messages. StorageContainerManager tells clients * using the existing heartbeat messages. StorageContainerManager lazily
* container locations by reporting that all registered nodes are a viable * initializes a single storage container to be served by those DataNodes.
* location. This will evolve from a stub to a full-fledged implementation * All subsequent requests for container locations will reply with that single
* capable of partitioning the keyspace across multiple containers, with * pipeline, using all registered nodes.
* appropriate distribution across nodes. *
* This will evolve from a stub to a full-fledged implementation capable of
* partitioning the keyspace across multiple containers, with appropriate
* distribution across nodes.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StorageContainerManager public class StorageContainerManager
@ -109,6 +121,8 @@ public class StorageContainerManager
private final StorageContainerNameService ns; private final StorageContainerNameService ns;
private final BlockManager blockManager; private final BlockManager blockManager;
private final XceiverClientManager xceiverClientManager;
private Pipeline singlePipeline;
/** The RPC server that listens to requests from DataNodes. */ /** The RPC server that listens to requests from DataNodes. */
private final RPC.Server serviceRpcServer; private final RPC.Server serviceRpcServer;
@ -128,11 +142,12 @@ public class StorageContainerManager
* *
* @param conf configuration * @param conf configuration
*/ */
public StorageContainerManager(Configuration conf) public StorageContainerManager(OzoneConfiguration conf)
throws IOException { throws IOException {
ns = new StorageContainerNameService(); ns = new StorageContainerNameService();
boolean haEnabled = false; boolean haEnabled = false;
blockManager = new BlockManager(ns, haEnabled, conf); blockManager = new BlockManager(ns, haEnabled, conf);
xceiverClientManager = new XceiverClientManager(conf);
RPC.setProtocolEngine(conf, DatanodeProtocolPB.class, RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
@ -193,20 +208,20 @@ public class StorageContainerManager
public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys) public Set<LocatedContainer> getStorageContainerLocations(Set<String> keys)
throws IOException { throws IOException {
LOG.trace("getStorageContainerLocations keys = {}", keys); LOG.trace("getStorageContainerLocations keys = {}", keys);
Pipeline pipeline = initSingleContainerPipeline();
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>(); List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false); blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
if (liveNodes.isEmpty()) { if (liveNodes.isEmpty()) {
throw new IOException("Storage container locations not found."); throw new IOException("Storage container locations not found.");
} }
String containerName = UUID.randomUUID().toString();
Set<DatanodeInfo> locations = Set<DatanodeInfo> locations =
Sets.<DatanodeInfo>newLinkedHashSet(liveNodes); Sets.<DatanodeInfo>newLinkedHashSet(liveNodes);
DatanodeInfo leader = liveNodes.get(0); DatanodeInfo leader = liveNodes.get(0);
Set<LocatedContainer> locatedContainers = Set<LocatedContainer> locatedContainers =
Sets.newLinkedHashSetWithExpectedSize(keys.size()); Sets.newLinkedHashSetWithExpectedSize(keys.size());
for (String key: keys) { for (String key: keys) {
locatedContainers.add(new LocatedContainer(key, key, containerName, locatedContainers.add(new LocatedContainer(key, key,
locations, leader)); pipeline.getContainerName(), locations, leader));
} }
LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}", LOG.trace("getStorageContainerLocations keys = {}, locatedContainers = {}",
keys, locatedContainers); keys, locatedContainers);
@ -415,6 +430,56 @@ public class StorageContainerManager
} }
} }
/**
* Lazily initializes a single container pipeline using all registered
* DataNodes via a synchronous call to the container protocol. This single
* container pipeline will be reused for container requests for the lifetime
* of this StorageContainerManager.
*
* @throws IOException if there is an I/O error
*/
private synchronized Pipeline initSingleContainerPipeline()
throws IOException {
if (singlePipeline == null) {
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(liveNodes, null, false);
if (liveNodes.isEmpty()) {
throw new IOException("Storage container locations not found.");
}
Pipeline newPipeline = newPipelineFromNodes(liveNodes,
UUID.randomUUID().toString());
XceiverClient xceiverClient =
xceiverClientManager.acquireClient(newPipeline);
try {
ContainerData containerData = ContainerData
.newBuilder()
.setName(newPipeline.getContainerName())
.build();
CreateContainerRequestProto createContainerRequest =
CreateContainerRequestProto.newBuilder()
.setPipeline(newPipeline.getProtobufMessage())
.setContainerData(containerData)
.build();
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.CreateContainer)
.setCreateContainer(createContainerRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(
request);
Result result = response.getResult();
if (result != Result.SUCCESS) {
throw new IOException(
"Failed to initialize container due to result code: " + result);
}
singlePipeline = newPipeline;
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
}
return singlePipeline;
}
/** /**
* Builds a message for logging startup information about an RPC server. * Builds a message for logging startup information about an RPC server.
* *
@ -429,6 +494,25 @@ public class StorageContainerManager
String.format("%s not started", description); String.format("%s not started", description);
} }
/**
* Translates a list of nodes, ordered such that the first is the leader, into
* a corresponding {@link Pipeline} object.
*
* @param nodes list of nodes
* @param containerName container name
* @return pipeline corresponding to nodes
*/
private static Pipeline newPipelineFromNodes(List<DatanodeDescriptor> nodes,
String containerName) {
String leaderId = nodes.get(0).getDatanodeUuid();
Pipeline pipeline = new Pipeline(leaderId);
for (DatanodeDescriptor node : nodes) {
pipeline.addMember(node);
}
pipeline.setContainerName(containerName);
return pipeline;
}
/** /**
* Starts an RPC server, if configured. * Starts an RPC server, if configured.
* *
@ -443,7 +527,7 @@ public class StorageContainerManager
* @return RPC server, or null if addr is null * @return RPC server, or null if addr is null
* @throws IOException if there is an I/O error while creating RPC server * @throws IOException if there is an I/O error while creating RPC server
*/ */
private static RPC.Server startRpcServer(Configuration conf, private static RPC.Server startRpcServer(OzoneConfiguration conf,
InetSocketAddress addr, Class<?> protocol, BlockingService instance, InetSocketAddress addr, Class<?> protocol, BlockingService instance,
String bindHostKey, String handlerCountKey, int handlerCountDefault) String bindHostKey, String handlerCountKey, int handlerCountDefault)
throws IOException { throws IOException {
@ -480,7 +564,7 @@ public class StorageContainerManager
* @param rpcServer started RPC server. If null, then the server was not * @param rpcServer started RPC server. If null, then the server was not
* started, and this method is a no-op. * started, and this method is a no-op.
*/ */
private static InetSocketAddress updateListenAddress(Configuration conf, private static InetSocketAddress updateListenAddress(OzoneConfiguration conf,
String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) { String rpcAddressKey, InetSocketAddress addr, RPC.Server rpcServer) {
if (rpcServer == null) { if (rpcServer == null) {
return null; return null;
@ -502,7 +586,7 @@ public class StorageContainerManager
StringUtils.startupShutdownMessage( StringUtils.startupShutdownMessage(
StorageContainerManager.class, argv, LOG); StorageContainerManager.class, argv, LOG);
StorageContainerManager scm = new StorageContainerManager( StorageContainerManager scm = new StorageContainerManager(
new Configuration()); new OzoneConfiguration());
scm.start(); scm.start();
scm.join(); scm.join();
} }

View File

@ -174,9 +174,12 @@ public class OzoneBucket {
InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING)); InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING));
putRequest.setEntity(new InputStreamEntity(is, data.length())); putRequest.setEntity(new InputStreamEntity(is, data.length()));
is.mark(data.length());
try {
putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is)); putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
putRequest } finally {
.setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length())); is.reset();
}
executePutKey(putRequest, httpClient); executePutKey(putRequest, httpClient);
} catch (IOException | URISyntaxException ex) { } catch (IOException | URISyntaxException ex) {

View File

@ -92,7 +92,7 @@ public class OzoneException extends Exception {
*/ */
public OzoneException(long httpCode, String shortMessage, String message) { public OzoneException(long httpCode, String shortMessage, String message) {
this.shortMessage = shortMessage; this.shortMessage = shortMessage;
this.resource = message; this.message = message;
this.httpCode = httpCode; this.httpCode = httpCode;
} }

View File

@ -77,6 +77,16 @@ public class OzoneQuota {
this.unit = unit; this.unit = unit;
} }
/**
* Formats a quota as a string.
*
* @param quota the quota to format
* @return string representation of quota
*/
public static String formatQuota(OzoneQuota quota) {
return String.valueOf(quota.size) + quota.unit;
}
/** /**
* Parses a user provided string and returns the * Parses a user provided string and returns the
* Quota Object. * Quota Object.

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
/**
* An {@link InputStream} used by the REST service in combination with the
* {@link DistributedStorageHandler} to read the value of a key from a sequence
* of container chunks. All bytes of the key value are stored in container
* chunks. Each chunk may contain multiple underlying {@link ByteBuffer}
* instances. This class encapsulates all state management for iterating
* through the sequence of chunks and the sequence of buffers within each chunk.
*/
class ChunkInputStream extends InputStream {
private static final int EOF = -1;
private final String key;
private final UserArgs args;
private XceiverClientManager xceiverClientManager;
private XceiverClient xceiverClient;
private List<ChunkInfo> chunks;
private int chunkOffset;
private List<ByteBuffer> buffers;
private int bufferOffset;
/**
* Creates a new ChunkInputStream.
*
* @param key chunk key
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param chunks list of chunks to read
* @param args container protocol call args
*/
public ChunkInputStream(String key, XceiverClientManager xceiverClientManager,
XceiverClient xceiverClient, List<ChunkInfo> chunks, UserArgs args) {
this.key = key;
this.args = args;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.chunks = chunks;
this.chunkOffset = 0;
this.buffers = null;
this.bufferOffset = 0;
}
@Override
public synchronized int read()
throws IOException {
checkOpen();
int available = prepareRead(1);
return available == EOF ? EOF : buffers.get(bufferOffset).get();
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
// According to the JavaDocs for InputStream, it is recommended that
// subclasses provide an override of bulk read if possible for performance
// reasons. In addition to performance, we need to do it for correctness
// reasons. The Ozone REST service uses PipedInputStream and
// PipedOutputStream to relay HTTP response data between a Jersey thread and
// a Netty thread. It turns out that PipedInputStream/PipedOutputStream
// have a subtle dependency (bug?) on the wrapped stream providing separate
// implementations of single-byte read and bulk read. Without this, get key
// responses might close the connection before writing all of the bytes
// advertised in the Content-Length.
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return 0;
}
checkOpen();
int available = prepareRead(len);
if (available == EOF) {
return EOF;
}
buffers.get(bufferOffset).get(b, off, available);
return available;
}
@Override
public synchronized void close() {
if (xceiverClientManager != null && xceiverClient != null) {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager = null;
xceiverClient = null;
}
}
/**
* Checks if the stream is open. If not, throws an exception.
*
* @throws IOException if stream is closed
*/
private synchronized void checkOpen() throws IOException {
if (xceiverClient == null) {
throw new IOException("ChunkInputStream has been closed.");
}
}
/**
* Prepares to read by advancing through chunks and buffers as needed until it
* finds data to return or encounters EOF.
*
* @param len desired length of data to read
* @return length of data available to read, possibly less than desired length
*/
private synchronized int prepareRead(int len) throws IOException {
for (;;) {
if (chunks == null || chunks.isEmpty()) {
// This must be an empty key.
return EOF;
} else if (buffers == null) {
// The first read triggers fetching the first chunk.
readChunkFromContainer(0);
} else if (!buffers.isEmpty() &&
buffers.get(bufferOffset).hasRemaining()) {
// Data is available from the current buffer.
ByteBuffer bb = buffers.get(bufferOffset);
return len > bb.remaining() ? bb.remaining() : len;
} else if (!buffers.isEmpty() &&
!buffers.get(bufferOffset).hasRemaining() &&
bufferOffset < buffers.size() - 1) {
// There are additional buffers available.
++bufferOffset;
} else if (chunkOffset < chunks.size() - 1) {
// There are additional chunks available.
readChunkFromContainer(chunkOffset + 1);
} else {
// All available input has been consumed.
return EOF;
}
}
}
/**
* Attempts to read the chunk at the specified offset in the chunk list. If
* successful, then the data of the read chunk is saved so that its bytes can
* be returned from subsequent read calls.
*
* @param readChunkOffset offset in the chunk list of which chunk to read
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void readChunkFromContainer(int readChunkOffset)
throws IOException {
final ReadChunkResponseProto readChunkResponse;
try {
readChunkResponse = readChunk(xceiverClient, chunks.get(readChunkOffset),
key, args);
} catch (OzoneException e) {
throw new IOException("Unexpected OzoneException", e);
}
chunkOffset = readChunkOffset;
ByteString byteString = readChunkResponse.getData();
buffers = byteString.asReadOnlyByteBufferList();
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.UUID;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.response.KeyInfo;
/**
* An {@link OutputStream} used by the REST service in combination with the
* {@link DistributedStorageHandler} to write the value of a key to a sequence
* of container chunks. Writes are buffered locally and periodically written to
* the container as a new chunk. In order to preserve the semantics that
* replacement of a pre-existing key is atomic, each instance of the stream has
* an internal unique identifier. This unique identifier and a monotonically
* increasing chunk index form a composite key that is used as the chunk name.
* After all data is written, a putKey call creates or updates the corresponding
* container key, and this call includes the full list of chunks that make up
* the key data. The list of chunks is updated all at once. Therefore, a
* concurrent reader never can see an intermediate state in which different
* chunks of data from different versions of the key data are interleaved.
* This class encapsulates all state management for buffering and writing
* through to the container.
*/
class ChunkOutputStream extends OutputStream {
private final String containerKey;
private final KeyInfo key;
private final UserArgs args;
private final KeyData.Builder containerKeyData;
private XceiverClientManager xceiverClientManager;
private XceiverClient xceiverClient;
private ByteBuffer buffer;
private final String streamId;
private int chunkIndex;
/**
* Creates a new ChunkOutputStream.
*
* @param containerKey container key
* @param key chunk key
* @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls
* @param args container protocol call args
*/
public ChunkOutputStream(String containerKey, KeyInfo key,
XceiverClientManager xceiverClientManager, XceiverClient xceiverClient,
UserArgs args) {
this.containerKey = containerKey;
this.key = key;
this.args = args;
this.containerKeyData = fromKeyToContainerKeyDataBuilder(
xceiverClient.getPipeline().getContainerName(), containerKey, key);
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.buffer = ByteBuffer.allocate(CHUNK_SIZE);
this.streamId = UUID.randomUUID().toString();
this.chunkIndex = 0;
}
@Override
public synchronized void write(int b) throws IOException {
checkOpen();
int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit();
buffer.put((byte)b);
if (buffer.position() == CHUNK_SIZE) {
flushBufferToChunk(rollbackPosition, rollbackLimit);
}
}
@Override
public synchronized void flush() throws IOException {
checkOpen();
if (buffer.position() > 0) {
int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit();
flushBufferToChunk(rollbackPosition, rollbackLimit);
}
}
@Override
public synchronized void close() throws IOException {
if (xceiverClientManager != null && xceiverClient != null &&
buffer != null) {
try {
if (buffer.position() > 0) {
writeChunkToContainer();
}
putKey(xceiverClient, containerKeyData.build(), args);
} catch (OzoneException e) {
throw new IOException("Unexpected OzoneException", e);
} finally {
xceiverClientManager.releaseClient(xceiverClient);
xceiverClientManager = null;
xceiverClient = null;
buffer = null;
}
}
}
/**
* Checks if the stream is open. If not, throws an exception.
*
* @throws IOException if stream is closed
*/
private synchronized void checkOpen() throws IOException {
if (xceiverClient == null) {
throw new IOException("ChunkOutputStream has been closed.");
}
}
/**
* Attempts to flush buffered writes by writing a new chunk to the container.
* If successful, then clears the buffer to prepare to receive writes for a
* new chunk.
*
* @param rollbackPosition position to restore in buffer if write fails
* @param rollbackLimit limit to restore in buffer if write fails
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void flushBufferToChunk(int rollbackPosition,
int rollbackLimit) throws IOException {
boolean success = false;
try {
writeChunkToContainer();
success = true;
} finally {
if (success) {
buffer.clear();
} else {
buffer.position(rollbackPosition);
buffer.limit(rollbackLimit);
}
}
}
/**
* Writes buffered data as a new chunk to the container and saves chunk
* information to be used later in putKey call.
*
* @throws IOException if there is an I/O error while performing the call
*/
private synchronized void writeChunkToContainer() throws IOException {
buffer.flip();
ByteString data = ByteString.copyFrom(buffer);
ChunkInfo chunk = ChunkInfo
.newBuilder()
.setChunkName(
key.getKeyName() + "_stream_" + streamId + "_chunk_" + ++chunkIndex)
.setOffset(0)
.setLen(data.size())
.build();
try {
writeChunk(xceiverClient, chunk, key.getKeyName(), data, args);
} catch (OzoneException e) {
throw new IOException("Unexpected OzoneException", e);
}
containerKeyData.addChunks(chunk);
}
}

View File

@ -0,0 +1,198 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import java.io.IOException;
import com.google.protobuf.ByteString;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.PutKeyRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadChunkResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
/**
* Implementation of all container protocol calls performed by
* {@link DistributedStorageHandler}.
*/
final class ContainerProtocolCalls {
/**
* Calls the container protocol to get a container key.
*
* @param xceiverClient client to perform call
* @param containerKeyData key data to identify container
* @param args container protocol call args
* @returns container protocol get key response
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneException if the container protocol call failed
*/
public static GetKeyResponseProto getKey(XceiverClient xceiverClient,
KeyData containerKeyData, UserArgs args) throws IOException,
OzoneException {
GetKeyRequestProto.Builder readKeyRequest = GetKeyRequestProto
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.GetKey)
.setTraceID(args.getRequestID())
.setGetKey(readKeyRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response, args);
return response.getGetKey();
}
/**
* Calls the container protocol to put a container key.
*
* @param xceiverClient client to perform call
* @param containerKeyData key data to identify container
* @param args container protocol call args
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneException if the container protocol call failed
*/
public static void putKey(XceiverClient xceiverClient,
KeyData containerKeyData, UserArgs args) throws IOException,
OzoneException {
PutKeyRequestProto.Builder createKeyRequest = PutKeyRequestProto
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyData(containerKeyData);
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.PutKey)
.setTraceID(args.getRequestID())
.setPutKey(createKeyRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response, args);
}
/**
* Calls the container protocol to read a chunk.
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to read
* @param key the key name
* @param args container protocol call args
* @returns container protocol read chunk response
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneException if the container protocol call failed
*/
public static ReadChunkResponseProto readChunk(XceiverClient xceiverClient,
ChunkInfo chunk, String key, UserArgs args)
throws IOException, OzoneException {
ReadChunkRequestProto.Builder readChunkRequest = ReadChunkRequestProto
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyName(key)
.setChunkData(chunk);
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.ReadChunk)
.setTraceID(args.getRequestID())
.setReadChunk(readChunkRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response, args);
return response.getReadChunk();
}
/**
* Calls the container protocol to write a chunk.
*
* @param xceiverClient client to perform call
* @param chunk information about chunk to write
* @param key the key name
* @param data the data of the chunk to write
* @param args container protocol call args
* @throws IOException if there is an I/O error while performing the call
* @throws OzoneException if the container protocol call failed
*/
public static void writeChunk(XceiverClient xceiverClient, ChunkInfo chunk,
String key, ByteString data, UserArgs args)
throws IOException, OzoneException {
WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto
.newBuilder()
.setPipeline(xceiverClient.getPipeline().getProtobufMessage())
.setKeyName(key)
.setChunkData(chunk)
.setData(data);
ContainerCommandRequestProto request = ContainerCommandRequestProto
.newBuilder()
.setCmdType(Type.WriteChunk)
.setTraceID(args.getRequestID())
.setWriteChunk(writeChunkRequest)
.build();
ContainerCommandResponseProto response = xceiverClient.sendCommand(request);
validateContainerResponse(response, args);
}
/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
*
* @param response container protocol call response
* @param args container protocol call args
* @throws OzoneException if the container protocol call failed
*/
private static void validateContainerResponse(
ContainerCommandResponseProto response, UserArgs args)
throws OzoneException {
switch (response.getResult()) {
case SUCCESS:
break;
case MALFORMED_REQUEST:
throw ErrorTable.newError(new OzoneException(HTTP_BAD_REQUEST,
"badRequest", "Bad container request."), args);
case UNSUPPORTED_REQUEST:
throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
"internalServerError", "Unsupported container request."), args);
case CONTAINER_INTERNAL_ERROR:
throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
"internalServerError", "Container internal error."), args);
default:
throw ErrorTable.newError(new OzoneException(HTTP_INTERNAL_ERROR,
"internalServerError", "Unrecognized container response."), args);
}
}
/**
* There is no need to instantiate this class.
*/
private ContainerProtocolCalls() {
}
}

View File

@ -18,7 +18,32 @@
package org.apache.hadoop.ozone.web.storage; package org.apache.hadoop.ozone.web.storage;
import static org.apache.hadoop.ozone.web.storage.ContainerProtocolCalls.*;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.TimeZone;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClient;
import org.apache.hadoop.ozone.container.common.transport.client.XceiverClientManager;
import org.apache.hadoop.ozone.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs; import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs; import org.apache.hadoop.ozone.web.handlers.KeyArgs;
@ -27,13 +52,13 @@ import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.response.BucketInfo; import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListBuckets; import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.response.ListVolumes; import org.apache.hadoop.ozone.web.response.ListVolumes;
import org.apache.hadoop.ozone.web.response.VolumeInfo; import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import java.io.IOException; import org.apache.hadoop.util.StringUtils;
import java.io.OutputStream;
/** /**
* A {@link StorageHandler} implementation that distributes object storage * A {@link StorageHandler} implementation that distributes object storage
@ -41,156 +66,283 @@ import java.io.OutputStream;
*/ */
public final class DistributedStorageHandler implements StorageHandler { public final class DistributedStorageHandler implements StorageHandler {
@Override private final StorageContainerLocationProtocolClientSideTranslatorPB
public void createVolume(VolumeArgs args) throws storageContainerLocation;
IOException, OzoneException { private final XceiverClientManager xceiverClientManager;
/**
* Creates a new DistributedStorageHandler.
*
* @param conf configuration
* @param storageContainerLocation StorageContainerLocationProtocol proxy
*/
public DistributedStorageHandler(OzoneConfiguration conf,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocation) {
this.storageContainerLocation = storageContainerLocation;
this.xceiverClientManager = new XceiverClientManager(conf);
}
@Override
public void createVolume(VolumeArgs args) throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
try {
VolumeInfo volume = new VolumeInfo();
volume.setVolumeName(args.getVolumeName());
volume.setQuota(args.getQuota());
volume.setOwner(new VolumeOwner(args.getUserName()));
volume.setCreatedOn(dateToString(new Date()));
volume.setCreatedBy(args.getAdminName());
KeyData containerKeyData = fromVolumeToContainerKeyData(
xceiverClient.getPipeline().getContainerName(), containerKey, volume);
putKey(xceiverClient, containerKeyData, args);
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
} }
@Override @Override
public void setVolumeOwner(VolumeArgs args) throws public void setVolumeOwner(VolumeArgs args) throws
IOException, OzoneException { IOException, OzoneException {
throw new UnsupportedOperationException("setVolumeOwner not implemented");
} }
@Override @Override
public void setVolumeQuota(VolumeArgs args, boolean remove) public void setVolumeQuota(VolumeArgs args, boolean remove)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException("setVolumeQuota not implemented");
} }
@Override @Override
public boolean checkVolumeAccess(VolumeArgs args) public boolean checkVolumeAccess(VolumeArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
return false; throw new UnsupportedOperationException("checkVolumeAccessnot implemented");
} }
@Override @Override
public ListVolumes listVolumes(UserArgs args) public ListVolumes listVolumes(UserArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
return null; throw new UnsupportedOperationException("listVolumes not implemented");
} }
@Override @Override
public void deleteVolume(VolumeArgs args) public void deleteVolume(VolumeArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException("deleteVolume not implemented");
} }
@Override @Override
public VolumeInfo getVolumeInfo(VolumeArgs args) public VolumeInfo getVolumeInfo(VolumeArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
return null; String containerKey = buildContainerKey(args.getVolumeName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
try {
KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
args);
return fromContainerKeyValueListToVolume(
response.getKeyData().getMetadataList());
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
} }
@Override @Override
public void createBucket(BucketArgs args) public void createBucket(final BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
try {
BucketInfo bucket = new BucketInfo();
bucket.setVolumeName(args.getVolumeName());
bucket.setBucketName(args.getBucketName());
bucket.setAcls(args.getAddAcls());
bucket.setVersioning(args.getVersioning());
bucket.setStorageType(args.getStorageType());
KeyData containerKeyData = fromBucketToContainerKeyData(
xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
putKey(xceiverClient, containerKeyData, args);
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
} }
@Override @Override
public void setBucketAcls(BucketArgs args) public void setBucketAcls(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException("setBucketAcls not implemented");
} }
@Override @Override
public void setBucketVersioning(BucketArgs args) public void setBucketVersioning(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException(
"setBucketVersioning not implemented");
} }
@Override @Override
public void setBucketStorageClass(BucketArgs args) public void setBucketStorageClass(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException(
"setBucketStorageClass not implemented");
} }
@Override @Override
public void deleteBucket(BucketArgs args) public void deleteBucket(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException("deleteBucket not implemented");
} }
@Override @Override
public void checkBucketAccess(BucketArgs args) public void checkBucketAccess(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
throw new UnsupportedOperationException(
"checkBucketAccess not implemented");
} }
@Override @Override
public ListBuckets listBuckets(VolumeArgs args) public ListBuckets listBuckets(VolumeArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
return null; throw new UnsupportedOperationException("listBuckets not implemented");
} }
@Override @Override
public BucketInfo getBucketInfo(BucketArgs args) public BucketInfo getBucketInfo(BucketArgs args)
throws IOException, OzoneException { throws IOException, OzoneException {
return null; String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
try {
KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
args);
return fromContainerKeyValueListToBucket(
response.getKeyData().getMetadataList());
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
} }
/**
* Writes a key in an existing bucket.
*
* @param args KeyArgs
* @return InputStream
* @throws OzoneException
*/
@Override @Override
public OutputStream newKeyWriter(KeyArgs args) throws IOException, public OutputStream newKeyWriter(KeyArgs args) throws IOException,
OzoneException { OzoneException {
return null; String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName());
KeyInfo key = new KeyInfo();
key.setKeyName(args.getKeyName());
key.setCreatedOn(dateToString(new Date()));
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
return new ChunkOutputStream(containerKey, key, xceiverClientManager,
xceiverClient, args);
} }
/**
* Tells the file system that the object has been written out completely and
* it can do any house keeping operation that needs to be done.
*
* @param args Key Args
* @param stream
* @throws IOException
*/
@Override @Override
public void commitKey(KeyArgs args, OutputStream stream) throws public void commitKey(KeyArgs args, OutputStream stream) throws
IOException, OzoneException { IOException, OzoneException {
stream.close();
} }
/**
* Reads a key from an existing bucket.
*
* @param args KeyArgs
* @return LengthInputStream
* @throws IOException
*/
@Override @Override
public LengthInputStream newKeyReader(KeyArgs args) throws IOException, public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
OzoneException { OzoneException {
return null; String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName());
XceiverClient xceiverClient = acquireXceiverClient(containerKey);
boolean success = false;
try {
KeyData containerKeyData = containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
GetKeyResponseProto response = getKey(xceiverClient, containerKeyData,
args);
long length = 0;
List<ChunkInfo> chunks = response.getKeyData().getChunksList();
for (ChunkInfo chunk : chunks) {
length += chunk.getLen();
}
success = true;
return new LengthInputStream(new ChunkInputStream(
containerKey, xceiverClientManager, xceiverClient, chunks, args),
length);
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient);
}
}
} }
/**
* Deletes an existing key.
*
* @param args KeyArgs
* @throws OzoneException
*/
@Override @Override
public void deleteKey(KeyArgs args) throws IOException, OzoneException { public void deleteKey(KeyArgs args) throws IOException, OzoneException {
throw new UnsupportedOperationException("deleteKey not implemented");
}
@Override
public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
throw new UnsupportedOperationException("listKeys not implemented");
} }
/** /**
* Returns a list of Key. * Acquires an {@link XceiverClient} connected to a {@link Pipeline} of nodes
* capable of serving container protocol operations. The container is
* selected based on the specified container key.
* *
* @param args KeyArgs * @param containerKey container key
* @return BucketList * @return XceiverClient connected to a container
* @throws IOException * @throws IOException if an XceiverClient cannot be acquired
*/ */
@Override private XceiverClient acquireXceiverClient(String containerKey)
public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { throws IOException {
return null; Set<LocatedContainer> locatedContainers =
storageContainerLocation.getStorageContainerLocations(
new HashSet<>(Arrays.asList(containerKey)));
Pipeline pipeline = newPipelineFromLocatedContainer(
locatedContainers.iterator().next());
return xceiverClientManager.acquireClient(pipeline);
}
/**
* Creates a container key from any number of components by combining all
* components with a delimiter.
*
* @param parts container key components
* @return container key
*/
private static String buildContainerKey(String... parts) {
return '/' + StringUtils.join('/', parts);
}
/**
* Formats a date in the expected string format.
*
* @param date the date to format
* @return formatted string representation of date
*/
private static String dateToString(Date date) {
SimpleDateFormat sdf =
new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
return sdf.format(date);
}
/**
* Translates a set of container locations, ordered such that the first is the
* leader, into a corresponding {@link Pipeline} object.
*
* @param locatedContainer container location
* @return pipeline corresponding to container locations
*/
private static Pipeline newPipelineFromLocatedContainer(
LocatedContainer locatedContainer) {
Set<DatanodeInfo> locations = locatedContainer.getLocations();
String leaderId = locations.iterator().next().getDatanodeUuid();
Pipeline pipeline = new Pipeline(leaderId);
for (DatanodeInfo location : locations) {
pipeline.addMember(location);
}
pipeline.setContainerName(locatedContainer.getContainerName());
return pipeline;
} }
} }

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import java.util.List;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.apache.hadoop.util.StringUtils;
/**
* This class contains methods that define the translation between the Ozone
* domain model and the storage container domain model.
*/
final class OzoneContainerTranslation {
private static final String ACLS = "ACLS";
private static final String BUCKET = "BUCKET";
private static final String BUCKET_NAME = "BUCKET_NAME";
private static final String CREATED_BY = "CREATED_BY";
private static final String CREATED_ON = "CREATED_ON";
private static final String KEY = "KEY";
private static final String OWNER = "OWNER";
private static final String QUOTA = "QUOTA";
private static final String STORAGE_TYPE = "STORAGE_TYPE";
private static final String TYPE = "TYPE";
private static final String VERSIONING = "VERSIONING";
private static final String VOLUME = "VOLUME";
private static final String VOLUME_NAME = "VOLUME_NAME";
/**
* Creates key data intended for reading a container key.
*
* @param containerName container name
* @param containerKey container key
* @return KeyData intended for reading the container key
*/
public static KeyData containerKeyDataForRead(String containerName,
String containerKey) {
return KeyData
.newBuilder()
.setContainerName(containerName)
.setName(containerKey)
.build();
}
/**
* Translates a bucket to its container representation.
*
* @param containerName container name
* @param containerKey container key
* @param bucket the bucket to translate
* @return KeyData representation of bucket
*/
public static KeyData fromBucketToContainerKeyData(
String containerName, String containerKey, BucketInfo bucket) {
KeyData.Builder containerKeyData = KeyData
.newBuilder()
.setContainerName(containerName)
.setName(containerKey)
.addMetadata(newKeyValue(TYPE, BUCKET))
.addMetadata(newKeyValue(VOLUME_NAME, bucket.getVolumeName()))
.addMetadata(newKeyValue(BUCKET_NAME, bucket.getBucketName()));
if (bucket.getAcls() != null) {
containerKeyData.addMetadata(newKeyValue(ACLS,
StringUtils.join(',', bucket.getAcls())));
}
if (bucket.getVersioning() != null &&
bucket.getVersioning() != Versioning.NOT_DEFINED) {
containerKeyData.addMetadata(newKeyValue(VERSIONING,
bucket.getVersioning().name()));
}
if (bucket.getStorageType() != StorageType.RAM_DISK) {
containerKeyData.addMetadata(newKeyValue(STORAGE_TYPE,
bucket.getStorageType().name()));
}
return containerKeyData.build();
}
/**
* Translates a bucket from its container representation.
*
* @param metadata container metadata representing the bucket
* @return bucket translated from container representation
*/
public static BucketInfo fromContainerKeyValueListToBucket(
List<KeyValue> metadata) {
BucketInfo bucket = new BucketInfo();
for (KeyValue keyValue : metadata) {
switch (keyValue.getKey()) {
case VOLUME_NAME:
bucket.setVolumeName(keyValue.getValue());
break;
case BUCKET_NAME:
bucket.setBucketName(keyValue.getValue());
break;
case VERSIONING:
bucket.setVersioning(
Enum.valueOf(Versioning.class, keyValue.getValue()));
break;
case STORAGE_TYPE:
bucket.setStorageType(
Enum.valueOf(StorageType.class, keyValue.getValue()));
break;
default:
break;
}
}
return bucket;
}
/**
* Translates a volume from its container representation.
*
* @param metadata container metadata representing the volume
* @return volume translated from container representation
*/
public static VolumeInfo fromContainerKeyValueListToVolume(
List<KeyValue> metadata) {
VolumeInfo volume = new VolumeInfo();
for (KeyValue keyValue : metadata) {
switch (keyValue.getKey()) {
case VOLUME_NAME:
volume.setVolumeName(keyValue.getValue());
break;
case CREATED_BY:
volume.setCreatedBy(keyValue.getValue());
break;
case CREATED_ON:
volume.setCreatedOn(keyValue.getValue());
break;
case OWNER:
volume.setOwner(new VolumeOwner(keyValue.getValue()));
break;
case QUOTA:
volume.setQuota(OzoneQuota.parseQuota(keyValue.getValue()));
break;
default:
break;
}
}
return volume;
}
/**
* Translates a key to its container representation.
*
* @param containerName container name
* @param containerKey container key
* @param keyInfo key information received from call
* @return KeyData intended for reading the container key
*/
public static KeyData fromKeyToContainerKeyData(String containerName,
String containerKey, KeyInfo key) {
return KeyData
.newBuilder()
.setContainerName(containerName)
.setName(containerKey)
.addMetadata(newKeyValue(TYPE, KEY))
.build();
}
/**
* Translates a key to its container representation. The return value is a
* builder that can be manipulated further before building the result.
*
* @param containerName container name
* @param containerKey container key
* @param keyInfo key information received from call
* @return KeyData builder
*/
public static KeyData.Builder fromKeyToContainerKeyDataBuilder(
String containerName, String containerKey, KeyInfo key) {
return KeyData
.newBuilder()
.setContainerName(containerName)
.setName(containerKey)
.addMetadata(newKeyValue(TYPE, KEY));
}
/**
* Translates a volume to its container representation.
*
* @param containerName container name
* @param containerKey container key
* @param volume the volume to translate
* @return KeyData representation of volume
*/
public static KeyData fromVolumeToContainerKeyData(
String containerName, String containerKey, VolumeInfo volume) {
KeyData.Builder containerKeyData = KeyData
.newBuilder()
.setContainerName(containerName)
.setName(containerKey)
.addMetadata(newKeyValue(TYPE, VOLUME))
.addMetadata(newKeyValue(VOLUME_NAME, volume.getVolumeName()))
.addMetadata(newKeyValue(CREATED_ON, volume.getCreatedOn()));
if (volume.getQuota() != null && volume.getQuota().sizeInBytes() != -1L) {
containerKeyData.addMetadata(newKeyValue(QUOTA,
OzoneQuota.formatQuota(volume.getQuota())));
}
if (volume.getOwner() != null && volume.getOwner().getName() != null &&
!volume.getOwner().getName().isEmpty()) {
containerKeyData.addMetadata(newKeyValue(OWNER,
volume.getOwner().getName()));
}
if (volume.getCreatedBy() != null && !volume.getCreatedBy().isEmpty()) {
containerKeyData.addMetadata(
newKeyValue(CREATED_BY, volume.getCreatedBy()));
}
return containerKeyData.build();
}
/**
* Translates a key-value pair to its container representation.
*
* @param key the key
* @param value the value
* @return container representation of key-value pair
*/
private static KeyValue newKeyValue(String key, Object value) {
return KeyValue.newBuilder().setKey(key).setValue(value.toString()).build();
}
/**
* There is no need to instantiate this class.
*/
private OzoneContainerTranslation() {
}
}

View File

@ -23,6 +23,10 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_STORAGE_RPC_ADDRESS_KE
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.Random;
import com.google.common.base.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -37,6 +41,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.storage.StorageContainerManager; import org.apache.hadoop.ozone.storage.StorageContainerManager;
import org.apache.hadoop.ozone.web.client.OzoneClient;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -53,6 +59,8 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(MiniOzoneCluster.class); LoggerFactory.getLogger(MiniOzoneCluster.class);
private static final String USER_AUTH = "hdfs";
private final OzoneConfiguration conf; private final OzoneConfiguration conf;
private final StorageContainerManager scm; private final StorageContainerManager scm;
@ -125,6 +133,52 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
scm.join(); scm.join();
} }
/**
* Creates an {@link OzoneClient} connected to this cluster's REST service.
* Callers take ownership of the client and must close it when done.
*
* @return OzoneClient connected to this cluster's REST service
* @throws OzoneException if Ozone encounters an error creating the client
*/
public OzoneClient createOzoneClient() throws OzoneException {
Preconditions.checkState(!getDataNodes().isEmpty(),
"Cannot create OzoneClient if the cluster has no DataNodes.");
// An Ozone request may originate at any DataNode, so pick one at random.
int dnIndex = new Random().nextInt(getDataNodes().size());
String uri = String.format("http://127.0.0.1:%d",
getDataNodes().get(dnIndex).getInfoPort());
LOG.info("Creating Ozone client to DataNode {} with URI {} and user {}",
dnIndex, uri, USER_AUTH);
try {
return new OzoneClient(uri, USER_AUTH);
} catch (URISyntaxException e) {
// We control the REST service URI, so it should never be invalid.
throw new IllegalStateException("Unexpected URISyntaxException", e);
}
}
/**
* Creates an RPC proxy connected to this cluster's StorageContainerManager
* for accessing container location information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing container location information
* @throws IOException if there is an I/O error
*/
public StorageContainerLocationProtocolClientSideTranslatorPB
createStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getStorageContainerLocationRpcAddress();
LOG.info(
"Creating StorageContainerLocationProtocol RPC client with address {}",
address);
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
}
/** /**
* Waits for the Ozone cluster to be ready for processing requests. * Waits for the Ozone cluster to be ready for processing requests.
*/ */
@ -146,23 +200,4 @@ public class MiniOzoneCluster extends MiniDFSCluster implements Closeable {
} }
} }
} }
/**
* Creates an RPC proxy connected to this cluster's StorageContainerManager
* for accessing container location information. Callers take ownership of
* the proxy and must close it when done.
*
* @return RPC proxy for accessing container location information
* @throws IOException if there is an I/O error
*/
protected StorageContainerLocationProtocolClientSideTranslatorPB
createStorageContainerLocationClient() throws IOException {
long version = RPC.getProtocolVersion(
StorageContainerLocationProtocolPB.class);
InetSocketAddress address = scm.getStorageContainerLocationRpcAddress();
return new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
}
} }

View File

@ -0,0 +1,253 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.ozone.OzoneConsts.CHUNK_SIZE;
import static org.junit.Assert.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.web.client.OzoneBucket;
import org.apache.hadoop.ozone.web.client.OzoneClient;
import org.apache.hadoop.ozone.web.client.OzoneVolume;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
/**
* End-to-end testing of Ozone REST operations.
*/
public class TestOzoneRestWithMiniCluster {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
private static int idSuffix;
private static OzoneClient ozoneClient;
@Rule
public ExpectedException exception = ExpectedException.none();
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "distributed");
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true);
cluster = new MiniOzoneCluster.Builder(conf).numDataNodes(3).build();
cluster.waitOzoneReady();
ozoneClient = cluster.createOzoneClient();
}
@AfterClass
public static void shutdown() throws InterruptedException {
IOUtils.cleanup(null, ozoneClient, cluster);
}
@Test
public void testCreateAndGetVolume() throws Exception {
String volumeName = nextId("volume");
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
volume = ozoneClient.getVolume(volumeName);
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
}
@Test
public void testCreateAndGetBucket() throws Exception {
String volumeName = nextId("volume");
String bucketName = nextId("bucket");
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
OzoneBucket bucket = volume.createBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
bucket = volume.getBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
}
@Test
public void testPutAndGetKey() throws Exception {
String volumeName = nextId("volume");
String bucketName = nextId("bucket");
String keyName = nextId("key");
String keyData = nextId("data");
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
OzoneBucket bucket = volume.createBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
}
@Test
public void testPutAndGetEmptyKey() throws Exception {
String volumeName = nextId("volume");
String bucketName = nextId("bucket");
String keyName = nextId("key");
String keyData = "";
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
OzoneBucket bucket = volume.createBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
}
@Test
public void testPutAndGetMultiChunkKey() throws Exception {
String volumeName = nextId("volume");
String bucketName = nextId("bucket");
String keyName = nextId("key");
int keyDataLen = 3 * CHUNK_SIZE;
String keyData = buildKeyData(keyDataLen);
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
OzoneBucket bucket = volume.createBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
}
@Test
public void testPutAndGetMultiChunkKeyLastChunkPartial() throws Exception {
String volumeName = nextId("volume");
String bucketName = nextId("bucket");
String keyName = nextId("key");
int keyDataLen = (int)(2.5 * CHUNK_SIZE);
String keyData = buildKeyData(keyDataLen);
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
OzoneBucket bucket = volume.createBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
}
@Test
public void testReplaceKey() throws Exception {
String volumeName = nextId("volume");
String bucketName = nextId("bucket");
String keyName = nextId("key");
int keyDataLen = (int)(2.5 * CHUNK_SIZE);
String keyData = buildKeyData(keyDataLen);
OzoneVolume volume = ozoneClient.createVolume(volumeName, "bilbo", "100TB");
assertNotNull(volume);
assertEquals(volumeName, volume.getVolumeName());
assertEquals(ozoneClient.getUserAuth(), volume.getCreatedby());
assertEquals("bilbo", volume.getOwnerName());
assertNotNull(volume.getQuota());
assertEquals(OzoneQuota.parseQuota("100TB").sizeInBytes(),
volume.getQuota().sizeInBytes());
OzoneBucket bucket = volume.createBucket(bucketName);
assertNotNull(bucket);
assertEquals(bucketName, bucket.getBucketName());
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
// Replace key with data consisting of fewer chunks.
keyDataLen = (int)(1.5 * CHUNK_SIZE);
keyData = buildKeyData(keyDataLen);
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
// Replace key with data consisting of more chunks.
keyDataLen = (int)(3.5 * CHUNK_SIZE);
keyData = buildKeyData(keyDataLen);
bucket.putKey(keyName, keyData);
assertEquals(keyData, bucket.getKey(keyName));
}
/**
* Creates sample key data of the specified length. The data is a string of
* printable ASCII characters. This makes it easy to debug through visual
* inspection of the chunk files if a test fails.
*
* @param keyDataLen desired length of key data
* @return string of printable ASCII characters of the specified length
*/
private static String buildKeyData(int keyDataLen) {
return new String(dataset(keyDataLen, 33, 93), UTF_8);
}
/**
* Generates identifiers unique enough for use in tests, so that individual
* tests don't collide on each others' data in the shared mini-cluster.
*
* @param idPrefix prefix to put in front of ID
* @return unique ID generated by appending a suffix to the given prefix
*/
private static String nextId(String idPrefix) {
return idPrefix + ++idSuffix;
}
}