HDFS-9960. OzoneHandler : Add localstorage support for keys. Contributed by Anu Engineer.

This commit is contained in:
Chris Nauroth 2016-03-19 13:36:18 -07:00
parent c73a32c21c
commit 2e517a64d8
9 changed files with 1004 additions and 38 deletions

View File

@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.web;
import org.apache.hadoop.ozone.web.exceptions.OzoneExceptionMapper; import org.apache.hadoop.ozone.web.exceptions.OzoneExceptionMapper;
import org.apache.hadoop.ozone.web.handlers.BucketHandler; import org.apache.hadoop.ozone.web.handlers.BucketHandler;
import org.apache.hadoop.ozone.web.handlers.KeyHandler;
import org.apache.hadoop.ozone.web.handlers.ServiceFilter; import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
import org.apache.hadoop.ozone.web.handlers.VolumeHandler; import org.apache.hadoop.ozone.web.handlers.VolumeHandler;
import org.apache.hadoop.ozone.web.messages.LengthInputStreamMessageBodyWriter;
import org.apache.hadoop.ozone.web.messages.StringMessageBodyWriter; import org.apache.hadoop.ozone.web.messages.StringMessageBodyWriter;
import javax.ws.rs.core.Application; import javax.ws.rs.core.Application;
@ -41,7 +43,9 @@ public class ObjectStoreApplication extends Application {
HashSet<Class<?>> set = new HashSet<>(); HashSet<Class<?>> set = new HashSet<>();
set.add(BucketHandler.class); set.add(BucketHandler.class);
set.add(VolumeHandler.class); set.add(VolumeHandler.class);
set.add(KeyHandler.class);
set.add(OzoneExceptionMapper.class); set.add(OzoneExceptionMapper.class);
set.add(LengthInputStreamMessageBodyWriter.class);
set.add(StringMessageBodyWriter.class); set.add(StringMessageBodyWriter.class);
return set; return set;
} }

View File

@ -18,19 +18,50 @@
package org.apache.hadoop.ozone.web.client; package org.apache.hadoop.ozone.web.client;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.headers.Header;
import org.apache.hadoop.ozone.web.request.OzoneAcl; import org.apache.hadoop.ozone.web.request.OzoneAcl;
import org.apache.hadoop.ozone.web.response.BucketInfo; import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.ozone.web.utils.OzoneConsts; import org.apache.hadoop.ozone.web.utils.OzoneConsts;
import org.apache.http.HttpException; import org.apache.http.HttpEntity;
import org.apache.http.HttpRequest; import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import javax.ws.rs.core.HttpHeaders;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import static java.net.HttpURLConnection.HTTP_CREATED;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.ozone.web.utils.OzoneUtils.ENCODING;
import static org.apache.hadoop.ozone.web.utils.OzoneUtils.ENCODING_NAME;
/** /**
* A Bucket class the represents an Ozone Bucket. * A Bucket class the represents an Ozone Bucket.
*/ */
@ -109,19 +140,386 @@ public class OzoneBucket {
* *
* @return Storage Class Enum * @return Storage Class Enum
*/ */
public StorageType getStorageClass() { public StorageType getStorageType() {
return bucketInfo.getStorageType(); return bucketInfo.getStorageType();
} }
/**
* Puts an Object in Ozone bucket.
*
* @param keyName - Name of the key
* @param data - Data that you want to put
* @throws OzoneException
*/
public void putKey(String keyName, String data) throws OzoneException {
if ((keyName == null) || keyName.isEmpty()) {
throw new OzoneClientException("Invalid key Name.");
}
if (data == null) {
throw new OzoneClientException("Invalid data.");
}
try {
OzoneClient client = getVolume().getClient();
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build();
HttpPut putRequest =
getVolume().getClient().getHttpPut(builder.toString());
InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING));
putRequest.setEntity(new InputStreamEntity(is, data.length()));
putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is));
putRequest
.setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length()));
executePutKey(putRequest, httpClient);
} catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage());
}
}
/**
* Puts an Object in Ozone Bucket.
*
* @param dataFile - File from which you want the data to be put. Key Name
* will same as the file name, devoid of any path.
* @throws OzoneException
*/
public void putKey(File dataFile) throws OzoneException {
if (dataFile == null) {
throw new OzoneClientException("Invalid file object.");
}
String keyName = dataFile.getName();
putKey(keyName, dataFile);
}
/**
* Puts a Key in Ozone Bucket.
*
* @param keyName - Name of the Key
* @param file - Stream that gets read to be put into Ozone.
* @throws OzoneException
*/
public void putKey(String keyName, File file)
throws OzoneException {
if ((keyName == null) || keyName.isEmpty()) {
throw new OzoneClientException("Invalid key Name");
}
if (file == null) {
throw new OzoneClientException("Invalid data stream");
}
try {
OzoneClient client = getVolume().getClient();
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build();
HttpPut putRequest =
getVolume().getClient().getHttpPut(builder.toString());
FileEntity fileEntity = new FileEntity(file, ContentType
.APPLICATION_OCTET_STREAM);
putRequest.setEntity(fileEntity);
FileInputStream fis = new FileInputStream(file);
putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(fis));
fis.close();
putRequest.setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(file
.length()));
httpClient.removeRequestInterceptorByClass(
org.apache.http.protocol.RequestContent.class);
executePutKey(putRequest, httpClient);
} catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage());
}
}
/**
* executePutKey executes the Put request against the Ozone Server.
*
* @param putRequest - Http Put Request
* @param httpClient - httpClient
* @throws OzoneException
* @throws IOException
*/
private void executePutKey(HttpPut putRequest, DefaultHttpClient httpClient)
throws OzoneException, IOException {
HttpEntity entity = null;
try {
HttpResponse response = httpClient.execute(putRequest);
int errorCode = response.getStatusLine().getStatusCode();
entity = response.getEntity();
if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
return;
}
if (entity == null) {
throw new OzoneClientException("Unexpected null in http payload");
}
throw OzoneException.parse(EntityUtils.toString(entity));
} finally {
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}
}
/**
* Gets a key from the Ozone server and writes to the file pointed by the
* downloadTo PAth.
*
* @param keyName - Key Name in Ozone.
* @param downloadTo File Name to download the Key's Data to
*/
public void getKey(String keyName, Path downloadTo) throws OzoneException {
if ((keyName == null) || keyName.isEmpty()) {
throw new OzoneClientException("Invalid key Name");
}
if (downloadTo == null) {
throw new OzoneClientException("Invalid download path");
}
try {
OzoneClient client = getVolume().getClient();
DefaultHttpClient httpClient = new DefaultHttpClient();
FileOutputStream outPutFile = new FileOutputStream(downloadTo.toFile());
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build();
HttpGet getRequest =
getVolume().getClient().getHttpGet(builder.toString());
executeGetKey(getRequest, httpClient, outPutFile);
outPutFile.flush();
outPutFile.close();
} catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage());
}
}
/**
* Returns the data part of the key as a string.
*
* @param keyName - KeyName to get
* @return String - Data
* @throws OzoneException
*/
public String getKey(String keyName) throws OzoneException {
if ((keyName == null) || keyName.isEmpty()) {
throw new OzoneClientException("Invalid key Name");
}
try {
OzoneClient client = getVolume().getClient();
ByteArrayOutputStream outPutStream = new ByteArrayOutputStream();
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build();
HttpGet getRequest =
getVolume().getClient().getHttpGet(builder.toString());
executeGetKey(getRequest, httpClient, outPutStream);
return outPutStream.toString(ENCODING_NAME);
} catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage());
}
}
/**
* Executes get key and returns the data.
*
* @param getRequest - http Get Request
* @param httpClient - Client
* @param stream - Stream to write data to.
* @throws IOException
* @throws OzoneException
*/
private void executeGetKey(HttpGet getRequest, DefaultHttpClient httpClient,
OutputStream stream)
throws IOException, OzoneException {
HttpEntity entity = null;
try {
HttpResponse response = httpClient.execute(getRequest);
int errorCode = response.getStatusLine().getStatusCode();
entity = response.getEntity();
if (errorCode == HTTP_OK) {
entity.writeTo(stream);
return;
}
if (entity == null) {
throw new OzoneClientException("Unexpected null in http payload");
}
throw OzoneException.parse(EntityUtils.toString(entity));
} finally {
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}
}
/**
* Deletes a key in this bucket.
*
* @param keyName - Name of the Key
* @throws OzoneException
*/
public void deleteKey(String keyName) throws OzoneException {
if ((keyName == null) || keyName.isEmpty()) {
throw new OzoneClientException("Invalid key Name");
}
try {
OzoneClient client = getVolume().getClient();
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()
+ "/" + keyName).build();
HttpDelete deleteRequest =
getVolume().getClient().getHttpDelete(builder.toString());
executeDeleteKey(deleteRequest, httpClient);
} catch (IOException | URISyntaxException ex) {
throw new OzoneClientException(ex.getMessage());
}
}
/**
* Executes deleteKey.
*
* @param deleteRequest - http Delete Request
* @param httpClient - Client
* @throws IOException
* @throws OzoneException
*/
private void executeDeleteKey(HttpDelete deleteRequest,
DefaultHttpClient httpClient)
throws IOException, OzoneException {
HttpEntity entity = null;
try {
HttpResponse response = httpClient.execute(deleteRequest);
int errorCode = response.getStatusLine().getStatusCode();
entity = response.getEntity();
if (errorCode == HTTP_OK) {
return;
}
if (entity == null) {
throw new OzoneClientException("Unexpected null in http payload");
}
throw OzoneException.parse(EntityUtils.toString(entity));
} finally {
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}
}
/**
* List all keys in a bucket.
*
* @return List of OzoneKeys
*/
public List<OzoneKey> listKeys() throws OzoneException {
try {
OzoneClient client = getVolume().getClient();
DefaultHttpClient httpClient = new DefaultHttpClient();
URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI());
builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName())
.build();
HttpGet getRequest = client.getHttpGet(builder.toString());
return executeListKeys(getRequest, httpClient);
} catch (IOException | URISyntaxException e) {
throw new OzoneClientException(e.getMessage());
}
}
/**
* Execute list Key.
*
* @param getRequest - HttpGet
* @param httpClient - HttpClient
* @return List<OzoneKey>
* @throws IOException
* @throws OzoneException
*/
private List<OzoneKey> executeListKeys(HttpGet getRequest,
DefaultHttpClient httpClient)
throws IOException, OzoneException {
HttpEntity entity = null;
List<OzoneKey> ozoneKeyList = new LinkedList<OzoneKey>();
try {
HttpResponse response = httpClient.execute(getRequest);
int errorCode = response.getStatusLine().getStatusCode();
entity = response.getEntity();
if (entity == null) {
throw new OzoneClientException("Unexpected null in http payload");
}
if (errorCode == HTTP_OK) {
String temp = EntityUtils.toString(entity);
ListKeys keyList = ListKeys.parse(temp);
for (KeyInfo info : keyList.getKeyList()) {
ozoneKeyList.add(new OzoneKey(info));
}
return ozoneKeyList;
} else {
throw OzoneException.parse(EntityUtils.toString(entity));
}
} finally {
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}
}
/**
* used to fix the content-length issue with protocol class.
*/
private static class ContentLengthHeaderRemover implements private static class ContentLengthHeaderRemover implements
HttpRequestInterceptor { HttpRequestInterceptor {
@Override @Override
public void process(HttpRequest request, HttpContext context) public void process(HttpRequest request, HttpContext context)
throws HttpException, IOException { throws IOException {
// fighting org.apache.http.protocol
// .RequestContent's ProtocolException("Content-Length header
// already present");
request.removeHeaders(HTTP.CONTENT_LEN); request.removeHeaders(HTTP.CONTENT_LEN);
} }
} }

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.client;
import org.apache.hadoop.ozone.web.response.KeyInfo;
/**
* Client side representation of an ozone Key.
*/
public class OzoneKey {
private KeyInfo keyInfo;
/**
* Constructor for Ozone Key.
* @param keyInfo - Key Info
*/
public OzoneKey(KeyInfo keyInfo) {
this.keyInfo = keyInfo;
}
/**
* Returns Key Info.
* @return Object Info
*/
public KeyInfo getObjectInfo() {
return keyInfo;
}
}

View File

@ -49,6 +49,8 @@ public class LocalStorageHandler implements StorageHandler {
/** /**
* Constructs LocalStorageHandler. * Constructs LocalStorageHandler.
*
* @param conf ozone conf.
*/ */
public LocalStorageHandler(Configuration conf) { public LocalStorageHandler(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -285,7 +287,9 @@ public class LocalStorageHandler implements StorageHandler {
@Override @Override
public OutputStream newKeyWriter(KeyArgs args) throws IOException, public OutputStream newKeyWriter(KeyArgs args) throws IOException,
OzoneException { OzoneException {
return null; OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
return oz.createKey(args);
} }
/** /**
@ -299,6 +303,9 @@ public class LocalStorageHandler implements StorageHandler {
@Override @Override
public void commitKey(KeyArgs args, OutputStream stream) throws public void commitKey(KeyArgs args, OutputStream stream) throws
IOException, OzoneException { IOException, OzoneException {
OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
oz.commitKey(args, stream);
} }
@ -312,7 +319,9 @@ public class LocalStorageHandler implements StorageHandler {
@Override @Override
public LengthInputStream newKeyReader(KeyArgs args) throws IOException, public LengthInputStream newKeyReader(KeyArgs args) throws IOException,
OzoneException { OzoneException {
return null; OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
return oz.newKeyReader(args);
} }
/** /**
@ -323,7 +332,9 @@ public class LocalStorageHandler implements StorageHandler {
*/ */
@Override @Override
public void deleteKey(KeyArgs args) throws IOException, OzoneException { public void deleteKey(KeyArgs args) throws IOException, OzoneException {
OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
oz.deleteKey(args);
} }
/** /**
@ -335,7 +346,10 @@ public class LocalStorageHandler implements StorageHandler {
*/ */
@Override @Override
public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
return null; OzoneMetadataManager oz =
OzoneMetadataManager.getOzoneMetadataManager(conf);
return oz.listKeys(args);
} }
} }

View File

@ -17,13 +17,18 @@
*/ */
package org.apache.hadoop.ozone.web.localstorage; package org.apache.hadoop.ozone.web.localstorage;
import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.commons.logging.LogFactory; import org.apache.hadoop.ozone.web.handlers.ListArgs;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable; import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.BucketArgs; import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs; import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.request.OzoneAcl; import org.apache.hadoop.ozone.web.request.OzoneAcl;
@ -34,9 +39,13 @@ import org.apache.hadoop.ozone.web.response.VolumeInfo;
import org.apache.hadoop.ozone.web.response.VolumeOwner; import org.apache.hadoop.ozone.web.response.VolumeOwner;
import org.apache.hadoop.ozone.web.utils.OzoneConsts; import org.apache.hadoop.ozone.web.utils.OzoneConsts;
import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBException;
import org.apache.commons.codec.digest.DigestUtils;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.FileOutputStream;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
@ -44,6 +53,7 @@ import java.util.List;
import java.util.ListIterator; import java.util.ListIterator;
import java.util.Locale; import java.util.Locale;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -111,7 +121,7 @@ public final class OzoneMetadataManager {
// // stand-alone tests for the protocol and client code. // // stand-alone tests for the protocol and client code.
*/ */
static final Log LOG = LogFactory.getLog(OzoneMetadataManager.class); static final Logger LOG = LoggerFactory.getLogger(OzoneMetadataManager.class);
private static final String USER_DB = "/user.db"; private static final String USER_DB = "/user.db";
private static final String META_DB = "/metadata.db"; private static final String META_DB = "/metadata.db";
private static OzoneMetadataManager bm = null; private static OzoneMetadataManager bm = null;
@ -119,28 +129,37 @@ public final class OzoneMetadataManager {
private OzoneLevelDBStore metadataDB; private OzoneLevelDBStore metadataDB;
private ReadWriteLock lock; private ReadWriteLock lock;
private Charset encoding = Charset.forName("UTF-8"); private Charset encoding = Charset.forName("UTF-8");
private String storageRoot;
private static final String OBJECT_DIR = "/_objects/";
// This table keeps a pointer to objects whose operations
// are in progress but not yet committed to persistent store
private ConcurrentHashMap<OutputStream, String> inProgressObjects;
/** /**
* Constructs OzoneMetadataManager. * Constructs OzoneMetadataManager.
*/ */
private OzoneMetadataManager(Configuration conf) { private OzoneMetadataManager(Configuration conf) throws IOException {
lock = new ReentrantReadWriteLock(); lock = new ReentrantReadWriteLock();
String storageRoot = storageRoot =
conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
File file = new File(storageRoot); File file = new File(storageRoot + OBJECT_DIR);
if (!file.exists() && !file.mkdirs()) { if (!file.exists() && !file.mkdirs()) {
LOG.fatal("Creation of Ozone root failed. " + file.toString()); LOG.error("Creation of Ozone root failed. " + file.toString());
throw new IOException("Creation of Ozone root failed.");
} }
try { try {
userDB = new OzoneLevelDBStore(new File(storageRoot + USER_DB), true); userDB = new OzoneLevelDBStore(new File(storageRoot + USER_DB), true);
metadataDB = new OzoneLevelDBStore(new File(storageRoot + META_DB), true); metadataDB = new OzoneLevelDBStore(new File(storageRoot + META_DB), true);
inProgressObjects = new ConcurrentHashMap<>();
} catch (IOException ex) { } catch (IOException ex) {
LOG.fatal("Cannot open db :" + ex.getMessage()); LOG.error("Cannot open db :" + ex.getMessage());
throw ex;
} }
} }
@ -150,7 +169,7 @@ public final class OzoneMetadataManager {
* @return OzoneMetadataManager * @return OzoneMetadataManager
*/ */
public static synchronized OzoneMetadataManager public static synchronized OzoneMetadataManager
getOzoneMetadataManager(Configuration conf) { getOzoneMetadataManager(Configuration conf) throws IOException {
if (bm == null) { if (bm == null) {
bm = new OzoneMetadataManager(conf); bm = new OzoneMetadataManager(conf);
} }
@ -440,8 +459,8 @@ public final class OzoneMetadataManager {
if (args.getRemoveAcls() != null) { if (args.getRemoveAcls() != null) {
OzoneException ex = ErrorTable.newError(ErrorTable.MALFORMED_ACL, args); OzoneException ex = ErrorTable.newError(ErrorTable.MALFORMED_ACL, args);
ex.setMessage("Remove ACLs specified in bucket create. Please remove " + ex.setMessage("Remove ACLs specified in bucket create. Please remove "
"them and retry."); + "them and retry.");
throw ex; throw ex;
} }
@ -680,6 +699,249 @@ public final class OzoneMetadataManager {
} }
} }
/**
* Creates a key and returns a stream to which this key can be written to.
* @param args KeyArgs
* @return - A stream into which key can be written to.
* @throws OzoneException
*/
public OutputStream createKey(KeyArgs args) throws OzoneException {
lock.writeLock().lock();
try {
String fileNameHash = DigestUtils.sha256Hex(args.getResourceName());
// Please don't try trillion objects unless the physical file system
// is capable of doing that in a single directory.
String fullPath = storageRoot + OBJECT_DIR + fileNameHash;
File f = new File(fullPath);
// In real ozone it would not be this way, a file will be overwritten
// only if the upload is successful.
if (f.exists()) {
LOG.debug("we are overwriting a file. This is by design.");
if(!f.delete()) {
LOG.error("Unable to delete the file: {}", fullPath);
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args);
}
}
// f.createNewFile();
FileOutputStream fsStream = new FileOutputStream(f);
inProgressObjects.put(fsStream, fullPath);
return fsStream;
} catch (IOException e) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e);
} finally {
lock.writeLock().unlock();
}
}
/**
* commit keys moves an In progress object into the metadata store
* so that key is visible in the metadata operations from that point
* onwards.
*
* @param args Object args
*
* @throws OzoneException
*/
public void commitKey(KeyArgs args, OutputStream stream)
throws OzoneException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
lock.writeLock().lock();
try {
byte[] bucketInfo = metadataDB.get(args.getParentName()
.getBytes(encoding));
if (bucketInfo == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
}
BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding));
bInfo.setKeyCount(bInfo.getKeyCount() + 1);
String fileNameHash = inProgressObjects.get(stream);
inProgressObjects.remove(stream);
if (fileNameHash == null) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args);
}
ListKeys keyList;
byte[] bucketListBytes = userDB.get(args.getParentName()
.getBytes(encoding));
if (bucketListBytes == null) {
keyList = new ListKeys();
} else {
keyList = ListKeys.parse(new String(bucketListBytes, encoding));
}
KeyInfo keyInfo;
byte[] objectBytes = metadataDB.get(args.getResourceName()
.getBytes(encoding));
if (objectBytes != null) {
// we are overwriting an existing object.
// TODO : Emit info for Accounting
keyInfo = KeyInfo.parse(new String(objectBytes, encoding));
keyList.getKeyList().remove(keyInfo);
} else {
keyInfo = new KeyInfo();
}
keyInfo.setCreatedOn(format.format(new Date(System.currentTimeMillis())));
// TODO : support version, we need to check if versioning
// is switched on the bucket and make appropriate calls.
keyInfo.setVersion(0);
keyInfo.setDataFileName(fileNameHash);
keyInfo.setKeyName(args.getKeyName());
keyInfo.setMd5hash(args.getHash());
keyInfo.setSize(args.getSize());
keyList.getKeyList().add(keyInfo);
// if the key exists, we overwrite happily :). since the
// earlier call - createObject - has overwritten the data.
metadataDB.put(args.getResourceName().getBytes(encoding),
keyInfo.toDBString().getBytes(encoding));
metadataDB.put(args.getParentName().getBytes(encoding),
bInfo.toDBString().getBytes(encoding));
userDB.put(args.getParentName().getBytes(encoding),
keyList.toDBString().getBytes(encoding));
} catch (IOException e) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e);
} finally {
lock.writeLock().unlock();
}
}
/**
* deletes an key from a given bucket.
*
* @param args - ObjectArgs
*
* @throws OzoneException
*/
public void deleteKey(KeyArgs args) throws OzoneException {
lock.writeLock().lock();
try {
byte[] bucketInfo = metadataDB.get(args.getParentName()
.getBytes(encoding));
if (bucketInfo == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
}
BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding));
bInfo.setKeyCount(bInfo.getKeyCount() - 1);
byte[] bucketListBytes = userDB.get(args.getParentName()
.getBytes(encoding));
if (bucketListBytes == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
}
ListKeys keyList = ListKeys.parse(new String(bucketListBytes, encoding));
byte[] objectBytes = metadataDB.get(args.getResourceName()
.getBytes(encoding));
if (objectBytes == null) {
throw ErrorTable.newError(ErrorTable.INVALID_KEY, args);
}
KeyInfo oInfo = KeyInfo.parse(new String(objectBytes, encoding));
keyList.getKeyList().remove(oInfo);
String fileNameHash = DigestUtils.sha256Hex(args.getResourceName());
String fullPath = storageRoot + OBJECT_DIR + fileNameHash;
File f = new File(fullPath);
if (f.exists()) {
if(!f.delete()) {
throw ErrorTable.newError(ErrorTable.KEY_OPERATION_CONFLICT, args);
}
} else {
throw ErrorTable.newError(ErrorTable.INVALID_KEY, args);
}
metadataDB.delete(args.getResourceName().getBytes(encoding));
metadataDB.put(args.getParentName().getBytes(encoding),
bInfo.toDBString().getBytes(encoding));
userDB.put(args.getParentName().getBytes(encoding),
keyList.toDBString().getBytes(encoding));
} catch (IOException e) {
throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e);
} finally {
lock.writeLock().unlock();
}
}
/**
* Returns a Stream for the file.
*
* @param args - Object args
*
* @return Stream
*
* @throws IOException
* @throws OzoneException
*/
public LengthInputStream newKeyReader(KeyArgs args)
throws IOException, OzoneException {
lock.readLock().lock();
try {
String fileNameHash = DigestUtils.sha256Hex(args.getResourceName());
String fullPath = storageRoot + OBJECT_DIR + fileNameHash;
File f = new File(fullPath);
if (!f.exists()) {
throw ErrorTable.newError(ErrorTable.INVALID_KEY, args);
}
long size = f.length();
FileInputStream fileStream = new FileInputStream(f);
return new LengthInputStream(fileStream, size);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns keys in a bucket.
* @param args
* @return List of keys.
* @throws IOException
* @throws OzoneException
*/
public ListKeys listKeys(ListArgs args) throws IOException, OzoneException {
lock.readLock().lock();
try {
byte[] bucketInfo = metadataDB.get(args.getResourceName()
.getBytes(encoding));
if (bucketInfo == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
}
byte[] bucketListBytes = userDB.get(args.getResourceName()
.getBytes(encoding));
if (bucketListBytes == null) {
throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args);
}
return ListKeys.parse(new String(bucketListBytes, encoding));
} finally {
lock.readLock().unlock();
}
}
/** /**
* This is used in updates to volume metadata. * This is used in updates to volume metadata.
*/ */

View File

@ -41,13 +41,13 @@ public class ListKeys {
private String prefix; private String prefix;
private long maxKeys; private long maxKeys;
private boolean truncated; private boolean truncated;
private List<KeyInfo> objectList; private List<KeyInfo> keyList;
/** /**
* Default constructor needed for json serialization. * Default constructor needed for json serialization.
*/ */
public ListKeys() { public ListKeys() {
this.objectList = new LinkedList<>(); this.keyList = new LinkedList<>();
} }
/** /**
@ -65,9 +65,9 @@ public class ListKeys {
/** /**
* Converts a Json string to POJO. * Converts a Json string to POJO.
* @param jsonString * @param jsonString - json string.
* @return ListObject * @return ListObject
* @throws IOException * @throws IOException - Json conversion error.
*/ */
public static ListKeys parse(String jsonString) throws IOException { public static ListKeys parse(String jsonString) throws IOException {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
@ -79,17 +79,17 @@ public class ListKeys {
* *
* @return List of KeyInfo Objects. * @return List of KeyInfo Objects.
*/ */
public List<KeyInfo> getObjectList() { public List<KeyInfo> getKeyList() {
return objectList; return keyList;
} }
/** /**
* Sets the list of Objects. * Sets the list of Objects.
* *
* @param objectList * @param objectList - List of Keys
*/ */
public void setObjectList(List<KeyInfo> objectList) { public void setKeyList(List<KeyInfo> objectList) {
this.objectList = objectList; this.keyList = objectList;
} }
/** /**
@ -142,6 +142,7 @@ public class ListKeys {
* keyCount. * keyCount.
* *
* @return String * @return String
* @throws IOException - On json Errors.
*/ */
public String toJsonString() throws IOException { public String toJsonString() throws IOException {
String[] ignorableFieldNames = {"dataFileName"}; String[] ignorableFieldNames = {"dataFileName"};
@ -159,6 +160,9 @@ public class ListKeys {
/** /**
* Returns the Object as a Json String. * Returns the Object as a Json String.
*
* @return String
* @throws IOException - on json errors.
*/ */
public String toDBString() throws IOException { public String toDBString() throws IOException {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
@ -170,7 +174,7 @@ public class ListKeys {
* list of keys. * list of keys.
*/ */
public void sort() { public void sort() {
Collections.sort(objectList); Collections.sort(keyList);
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.web.utils; package org.apache.hadoop.ozone.web.utils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable; import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.handlers.UserArgs; import org.apache.hadoop.ozone.web.handlers.UserArgs;
@ -27,7 +28,7 @@ import org.apache.hadoop.ozone.web.headers.Header;
import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Request; import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.InputStream; import javax.ws.rs.core.MediaType;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -45,7 +46,8 @@ import java.util.UUID;
@InterfaceAudience.Private @InterfaceAudience.Private
public final class OzoneUtils { public final class OzoneUtils {
public static final Charset ENCODING = Charset.forName("UTF-8"); public static final String ENCODING_NAME = "UTF-8";
public static final Charset ENCODING = Charset.forName(ENCODING_NAME);
private OzoneUtils() { private OzoneUtils() {
// Never constructed // Never constructed
@ -256,15 +258,17 @@ public final class OzoneUtils {
* @return JAX-RS Response * @return JAX-RS Response
*/ */
public static Response getResponse(UserArgs args, int statusCode, public static Response getResponse(UserArgs args, int statusCode,
InputStream stream) { LengthInputStream stream) {
SimpleDateFormat format = SimpleDateFormat format =
new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US); new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
format.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE)); format.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
String date = format.format(new Date(System.currentTimeMillis())); String date = format.format(new Date(System.currentTimeMillis()));
return Response.ok(stream) return Response.ok(stream, MediaType.APPLICATION_OCTET_STREAM)
.header(Header.OZONE_SERVER_NAME, args.getHostName()) .header(Header.OZONE_SERVER_NAME, args.getHostName())
.header(Header.OZONE_REQUEST_ID, args.getRequestID()) .header(Header.OZONE_REQUEST_ID, args.getRequestID())
.header(HttpHeaders.DATE, date).status(statusCode) .header(HttpHeaders.DATE, date).status(statusCode)
.header(HttpHeaders.CONTENT_TYPE, "application/octet-stream").build(); .header(HttpHeaders.CONTENT_LENGTH, stream.getLength())
.build();
} }
} }

View File

@ -59,7 +59,7 @@ public class TestBuckets {
OzoneConfiguration conf = new OzoneConfiguration(); OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource(""); URL p = conf.getClass().getResource("");
String path = p.getPath(); String path = p.getPath().concat(TestBuckets.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);

View File

@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.client;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Random;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public class TestKeys {
static MiniDFSCluster cluster = null;
static int port = 0;
static private String path;
private static OzoneClient client = null;
/**
* Create a MiniDFSCluster for testing.
*
* Ozone is made active by setting DFS_OBJECTSTORE_ENABLED_KEY = true and
* DFS_STORAGE_HANDLER_TYPE_KEY = "local" , which uses a local
* directory to emulate Ozone backend.
*
* @throws IOException
*/
@BeforeClass
public static void init()
throws IOException, OzoneException, URISyntaxException {
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
path = p.getPath().concat(TestKeys.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT,
OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path);
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true);
conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local");
conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();
client = new OzoneClient(String.format("http://localhost:%d", port));
}
/**
* shutdown MiniDFSCluster
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Creates a file with Random Data
*
* @return File.
*/
private File createRandomDataFile(String fileName, long size) {
File tmpDir = new File(path);
tmpDir.mkdirs();
File tmpFile = new File(path + "/" + fileName);
try {
FileOutputStream randFile = new FileOutputStream(tmpFile);
Random r = new Random();
for (int x = 0; x < size; x++) {
char c = (char) (r.nextInt(26) + 'a');
randFile.write(c);
}
} catch (IOException e) {
fail(e.getMessage());
}
return tmpFile;
}
private class PutHelper {
OzoneVolume vol;
OzoneBucket bucket;
File file;
public OzoneVolume getVol() {
return vol;
}
public OzoneBucket getBucket() {
return bucket;
}
public File getFile() {
return file;
}
/**
* This function is reused in all other tests.
*
* @return Returns the name of the new key that was created.
* @throws OzoneException
*/
private String putKey() throws
OzoneException {
String volumeName = OzoneUtils.getRequestID().toLowerCase();
client.setUserAuth("hdfs");
vol = client.createVolume(volumeName, "bilbo", "100TB");
String[] acls = {"user:frodo:rw", "user:samwise:rw"};
String bucketName = OzoneUtils.getRequestID().toLowerCase();
bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT);
String keyName = OzoneUtils.getRequestID().toLowerCase();
file = createRandomDataFile(keyName, 1024);
bucket.putKey(keyName, file);
return keyName;
}
}
@Test
public void testPutKey() throws OzoneException {
PutHelper helper = new PutHelper();
helper.putKey();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
}
@Test
public void testPutAndGetKey() throws OzoneException, IOException {
PutHelper helper = new PutHelper();
String keyName = helper.putKey();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
String newFileName = path + "/" +OzoneUtils.getRequestID().toLowerCase();
Path newPath = Paths.get(newFileName);
helper.getBucket().getKey(keyName, newPath);
FileInputStream original = new FileInputStream(helper.getFile());
FileInputStream downloaded = new FileInputStream(newPath.toFile());
String originalHash = DigestUtils.sha256Hex(original);
String downloadedHash = DigestUtils.sha256Hex(downloaded);
assertEquals(
"Sha256 does not match between original file and downloaded file.",
originalHash, downloadedHash);
}
@Test
public void testPutAndDeleteKey() throws OzoneException, IOException {
PutHelper helper = new PutHelper();
String keyName = helper.putKey();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
helper.getBucket().deleteKey(keyName);
try {
helper.getBucket().getKey(keyName);
fail("Get Key on a deleted key should have thrown");
} catch (OzoneException ex) {
assertEquals(ex.getShortMessage(),
ErrorTable.INVALID_KEY.getShortMessage());
}
}
@Test
public void testPutAndListKey() throws OzoneException, IOException {
PutHelper helper = new PutHelper();
helper.putKey();
assertNotNull(helper.getBucket());
assertNotNull(helper.getFile());
for (int x = 0; x < 10; x++) {
String newkeyName = OzoneUtils.getRequestID().toLowerCase();
helper.getBucket().putKey(newkeyName, helper.getFile());
}
List<OzoneKey> keyList = helper.getBucket().listKeys();
Assert.assertEquals(keyList.size(), 11);
}
}