From 2e517a64d82c2b73627b7f7b1d9b348d0b872fe5 Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Sat, 19 Mar 2016 13:36:18 -0700 Subject: [PATCH] HDFS-9960. OzoneHandler : Add localstorage support for keys. Contributed by Anu Engineer. --- .../ozone/web/ObjectStoreApplication.java | 4 + .../hadoop/ozone/web/client/OzoneBucket.java | 412 +++++++++++++++++- .../hadoop/ozone/web/client/OzoneKey.java | 44 ++ .../web/localstorage/LocalStorageHandler.java | 22 +- .../localstorage/OzoneMetadataManager.java | 284 +++++++++++- .../hadoop/ozone/web/response/ListKeys.java | 24 +- .../hadoop/ozone/web/utils/OzoneUtils.java | 14 +- .../hadoop/ozone/web/client/TestBuckets.java | 2 +- .../hadoop/ozone/web/client/TestKeys.java | 236 ++++++++++ 9 files changed, 1004 insertions(+), 38 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneKey.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java index 0c63c11c8b9..26aa45fbb82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/ObjectStoreApplication.java @@ -20,8 +20,10 @@ package org.apache.hadoop.ozone.web; import org.apache.hadoop.ozone.web.exceptions.OzoneExceptionMapper; import org.apache.hadoop.ozone.web.handlers.BucketHandler; +import org.apache.hadoop.ozone.web.handlers.KeyHandler; import org.apache.hadoop.ozone.web.handlers.ServiceFilter; import org.apache.hadoop.ozone.web.handlers.VolumeHandler; +import org.apache.hadoop.ozone.web.messages.LengthInputStreamMessageBodyWriter; import org.apache.hadoop.ozone.web.messages.StringMessageBodyWriter; import javax.ws.rs.core.Application; @@ -41,7 +43,9 @@ public class ObjectStoreApplication extends Application { HashSet> set = new HashSet<>(); set.add(BucketHandler.class); set.add(VolumeHandler.class); + set.add(KeyHandler.class); set.add(OzoneExceptionMapper.class); + set.add(LengthInputStreamMessageBodyWriter.class); set.add(StringMessageBodyWriter.class); return set; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java index 217e1c13420..248abab79ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneBucket.java @@ -18,19 +18,50 @@ package org.apache.hadoop.ozone.web.client; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.headers.Header; import org.apache.hadoop.ozone.web.request.OzoneAcl; import org.apache.hadoop.ozone.web.response.BucketInfo; +import org.apache.hadoop.ozone.web.response.KeyInfo; +import org.apache.hadoop.ozone.web.response.ListKeys; import org.apache.hadoop.ozone.web.utils.OzoneConsts; -import org.apache.http.HttpException; +import org.apache.http.HttpEntity; import org.apache.http.HttpRequest; import org.apache.http.HttpRequestInterceptor; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.FileEntity; +import org.apache.http.entity.InputStreamEntity; +import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; +import javax.ws.rs.core.HttpHeaders; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.util.LinkedList; import java.util.List; +import static java.net.HttpURLConnection.HTTP_CREATED; +import static java.net.HttpURLConnection.HTTP_OK; +import static org.apache.hadoop.ozone.web.utils.OzoneUtils.ENCODING; +import static org.apache.hadoop.ozone.web.utils.OzoneUtils.ENCODING_NAME; + /** * A Bucket class the represents an Ozone Bucket. */ @@ -109,19 +140,386 @@ public class OzoneBucket { * * @return Storage Class Enum */ - public StorageType getStorageClass() { + public StorageType getStorageType() { return bucketInfo.getStorageType(); } + /** + * Puts an Object in Ozone bucket. + * + * @param keyName - Name of the key + * @param data - Data that you want to put + * @throws OzoneException + */ + public void putKey(String keyName, String data) throws OzoneException { + if ((keyName == null) || keyName.isEmpty()) { + throw new OzoneClientException("Invalid key Name."); + } + + if (data == null) { + throw new OzoneClientException("Invalid data."); + } + + try { + OzoneClient client = getVolume().getClient(); + + DefaultHttpClient httpClient = new DefaultHttpClient(); + + URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); + builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() + + "/" + keyName).build(); + + HttpPut putRequest = + getVolume().getClient().getHttpPut(builder.toString()); + + InputStream is = new ByteArrayInputStream(data.getBytes(ENCODING)); + putRequest.setEntity(new InputStreamEntity(is, data.length())); + putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(is)); + putRequest + .setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(data.length())); + executePutKey(putRequest, httpClient); + + } catch (IOException | URISyntaxException ex) { + throw new OzoneClientException(ex.getMessage()); + } + } + + /** + * Puts an Object in Ozone Bucket. + * + * @param dataFile - File from which you want the data to be put. Key Name + * will same as the file name, devoid of any path. + * @throws OzoneException + */ + public void putKey(File dataFile) throws OzoneException { + if (dataFile == null) { + throw new OzoneClientException("Invalid file object."); + } + String keyName = dataFile.getName(); + putKey(keyName, dataFile); + } + + /** + * Puts a Key in Ozone Bucket. + * + * @param keyName - Name of the Key + * @param file - Stream that gets read to be put into Ozone. + * @throws OzoneException + */ + public void putKey(String keyName, File file) + throws OzoneException { + + if ((keyName == null) || keyName.isEmpty()) { + throw new OzoneClientException("Invalid key Name"); + } + + if (file == null) { + throw new OzoneClientException("Invalid data stream"); + } + + try { + OzoneClient client = getVolume().getClient(); + + DefaultHttpClient httpClient = new DefaultHttpClient(); + URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); + builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() + + "/" + keyName).build(); + + HttpPut putRequest = + getVolume().getClient().getHttpPut(builder.toString()); + + FileEntity fileEntity = new FileEntity(file, ContentType + .APPLICATION_OCTET_STREAM); + putRequest.setEntity(fileEntity); + + FileInputStream fis = new FileInputStream(file); + putRequest.setHeader(Header.CONTENT_MD5, DigestUtils.md5Hex(fis)); + fis.close(); + putRequest.setHeader(HttpHeaders.CONTENT_LENGTH, Long.toString(file + .length())); + httpClient.removeRequestInterceptorByClass( + org.apache.http.protocol.RequestContent.class); + executePutKey(putRequest, httpClient); + + } catch (IOException | URISyntaxException ex) { + throw new OzoneClientException(ex.getMessage()); + } + } + + /** + * executePutKey executes the Put request against the Ozone Server. + * + * @param putRequest - Http Put Request + * @param httpClient - httpClient + * @throws OzoneException + * @throws IOException + */ + private void executePutKey(HttpPut putRequest, DefaultHttpClient httpClient) + throws OzoneException, IOException { + HttpEntity entity = null; + try { + + HttpResponse response = httpClient.execute(putRequest); + int errorCode = response.getStatusLine().getStatusCode(); + entity = response.getEntity(); + + if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) { + return; + } + + if (entity == null) { + throw new OzoneClientException("Unexpected null in http payload"); + } + + throw OzoneException.parse(EntityUtils.toString(entity)); + } finally { + if (entity != null) { + EntityUtils.consumeQuietly(entity); + } + } + } + + /** + * Gets a key from the Ozone server and writes to the file pointed by the + * downloadTo PAth. + * + * @param keyName - Key Name in Ozone. + * @param downloadTo File Name to download the Key's Data to + */ + public void getKey(String keyName, Path downloadTo) throws OzoneException { + + if ((keyName == null) || keyName.isEmpty()) { + throw new OzoneClientException("Invalid key Name"); + } + + if (downloadTo == null) { + throw new OzoneClientException("Invalid download path"); + } + + try { + OzoneClient client = getVolume().getClient(); + + DefaultHttpClient httpClient = new DefaultHttpClient(); + FileOutputStream outPutFile = new FileOutputStream(downloadTo.toFile()); + + URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); + builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() + + "/" + keyName).build(); + + HttpGet getRequest = + getVolume().getClient().getHttpGet(builder.toString()); + executeGetKey(getRequest, httpClient, outPutFile); + outPutFile.flush(); + outPutFile.close(); + } catch (IOException | URISyntaxException ex) { + throw new OzoneClientException(ex.getMessage()); + } + } + + /** + * Returns the data part of the key as a string. + * + * @param keyName - KeyName to get + * @return String - Data + * @throws OzoneException + */ + public String getKey(String keyName) throws OzoneException { + + if ((keyName == null) || keyName.isEmpty()) { + throw new OzoneClientException("Invalid key Name"); + } + + try { + OzoneClient client = getVolume().getClient(); + ByteArrayOutputStream outPutStream = new ByteArrayOutputStream(); + + DefaultHttpClient httpClient = new DefaultHttpClient(); + URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); + + builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() + + "/" + keyName).build(); + + HttpGet getRequest = + getVolume().getClient().getHttpGet(builder.toString()); + executeGetKey(getRequest, httpClient, outPutStream); + return outPutStream.toString(ENCODING_NAME); + } catch (IOException | URISyntaxException ex) { + throw new OzoneClientException(ex.getMessage()); + } + + } + + /** + * Executes get key and returns the data. + * + * @param getRequest - http Get Request + * @param httpClient - Client + * @param stream - Stream to write data to. + * @throws IOException + * @throws OzoneException + */ + private void executeGetKey(HttpGet getRequest, DefaultHttpClient httpClient, + OutputStream stream) + throws IOException, OzoneException { + + HttpEntity entity = null; + try { + + HttpResponse response = httpClient.execute(getRequest); + int errorCode = response.getStatusLine().getStatusCode(); + entity = response.getEntity(); + + if (errorCode == HTTP_OK) { + entity.writeTo(stream); + return; + } + + if (entity == null) { + throw new OzoneClientException("Unexpected null in http payload"); + } + + throw OzoneException.parse(EntityUtils.toString(entity)); + } finally { + if (entity != null) { + EntityUtils.consumeQuietly(entity); + } + } + } + + /** + * Deletes a key in this bucket. + * + * @param keyName - Name of the Key + * @throws OzoneException + */ + public void deleteKey(String keyName) throws OzoneException { + + if ((keyName == null) || keyName.isEmpty()) { + throw new OzoneClientException("Invalid key Name"); + } + + try { + OzoneClient client = getVolume().getClient(); + DefaultHttpClient httpClient = new DefaultHttpClient(); + + URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); + builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName() + + "/" + keyName).build(); + + HttpDelete deleteRequest = + getVolume().getClient().getHttpDelete(builder.toString()); + executeDeleteKey(deleteRequest, httpClient); + } catch (IOException | URISyntaxException ex) { + throw new OzoneClientException(ex.getMessage()); + } + } + + /** + * Executes deleteKey. + * + * @param deleteRequest - http Delete Request + * @param httpClient - Client + * @throws IOException + * @throws OzoneException + */ + private void executeDeleteKey(HttpDelete deleteRequest, + DefaultHttpClient httpClient) + throws IOException, OzoneException { + + HttpEntity entity = null; + try { + + HttpResponse response = httpClient.execute(deleteRequest); + int errorCode = response.getStatusLine().getStatusCode(); + entity = response.getEntity(); + + if (errorCode == HTTP_OK) { + return; + } + + if (entity == null) { + throw new OzoneClientException("Unexpected null in http payload"); + } + + throw OzoneException.parse(EntityUtils.toString(entity)); + } finally { + if (entity != null) { + EntityUtils.consumeQuietly(entity); + } + } + } + + /** + * List all keys in a bucket. + * + * @return List of OzoneKeys + */ + public List listKeys() throws OzoneException { + try { + OzoneClient client = getVolume().getClient(); + DefaultHttpClient httpClient = new DefaultHttpClient(); + URIBuilder builder = new URIBuilder(volume.getClient().getEndPointURI()); + builder.setPath("/" + getVolume().getVolumeName() + "/" + getBucketName()) + .build(); + + HttpGet getRequest = client.getHttpGet(builder.toString()); + return executeListKeys(getRequest, httpClient); + + } catch (IOException | URISyntaxException e) { + throw new OzoneClientException(e.getMessage()); + } + } + + /** + * Execute list Key. + * + * @param getRequest - HttpGet + * @param httpClient - HttpClient + * @return List + * @throws IOException + * @throws OzoneException + */ + private List executeListKeys(HttpGet getRequest, + DefaultHttpClient httpClient) + throws IOException, OzoneException { + HttpEntity entity = null; + List ozoneKeyList = new LinkedList(); + try { + HttpResponse response = httpClient.execute(getRequest); + int errorCode = response.getStatusLine().getStatusCode(); + + entity = response.getEntity(); + + if (entity == null) { + throw new OzoneClientException("Unexpected null in http payload"); + } + if (errorCode == HTTP_OK) { + String temp = EntityUtils.toString(entity); + ListKeys keyList = ListKeys.parse(temp); + + for (KeyInfo info : keyList.getKeyList()) { + ozoneKeyList.add(new OzoneKey(info)); + } + return ozoneKeyList; + + } else { + throw OzoneException.parse(EntityUtils.toString(entity)); + } + } finally { + if (entity != null) { + EntityUtils.consumeQuietly(entity); + } + } + } + + /** + * used to fix the content-length issue with protocol class. + */ private static class ContentLengthHeaderRemover implements HttpRequestInterceptor { @Override public void process(HttpRequest request, HttpContext context) - throws HttpException, IOException { - - // fighting org.apache.http.protocol - // .RequestContent's ProtocolException("Content-Length header - // already present"); + throws IOException { request.removeHeaders(HTTP.CONTENT_LEN); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneKey.java new file mode 100644 index 00000000000..5a3a0c4e788 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/client/OzoneKey.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.client; + +import org.apache.hadoop.ozone.web.response.KeyInfo; + +/** + * Client side representation of an ozone Key. + */ +public class OzoneKey { + private KeyInfo keyInfo; + + /** + * Constructor for Ozone Key. + * @param keyInfo - Key Info + */ + public OzoneKey(KeyInfo keyInfo) { + this.keyInfo = keyInfo; + } + + /** + * Returns Key Info. + * @return Object Info + */ + public KeyInfo getObjectInfo() { + return keyInfo; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java index edba6881ef4..d9a06d1434e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/LocalStorageHandler.java @@ -49,6 +49,8 @@ public class LocalStorageHandler implements StorageHandler { /** * Constructs LocalStorageHandler. + * + * @param conf ozone conf. */ public LocalStorageHandler(Configuration conf) { this.conf = conf; @@ -285,7 +287,9 @@ public class LocalStorageHandler implements StorageHandler { @Override public OutputStream newKeyWriter(KeyArgs args) throws IOException, OzoneException { - return null; + OzoneMetadataManager oz = + OzoneMetadataManager.getOzoneMetadataManager(conf); + return oz.createKey(args); } /** @@ -299,6 +303,9 @@ public class LocalStorageHandler implements StorageHandler { @Override public void commitKey(KeyArgs args, OutputStream stream) throws IOException, OzoneException { + OzoneMetadataManager oz = + OzoneMetadataManager.getOzoneMetadataManager(conf); + oz.commitKey(args, stream); } @@ -312,7 +319,9 @@ public class LocalStorageHandler implements StorageHandler { @Override public LengthInputStream newKeyReader(KeyArgs args) throws IOException, OzoneException { - return null; + OzoneMetadataManager oz = + OzoneMetadataManager.getOzoneMetadataManager(conf); + return oz.newKeyReader(args); } /** @@ -323,7 +332,9 @@ public class LocalStorageHandler implements StorageHandler { */ @Override public void deleteKey(KeyArgs args) throws IOException, OzoneException { - + OzoneMetadataManager oz = + OzoneMetadataManager.getOzoneMetadataManager(conf); + oz.deleteKey(args); } /** @@ -335,7 +346,10 @@ public class LocalStorageHandler implements StorageHandler { */ @Override public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { - return null; + OzoneMetadataManager oz = + OzoneMetadataManager.getOzoneMetadataManager(conf); + return oz.listKeys(args); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java index db9903c98db..8ab679aa158 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/localstorage/OzoneMetadataManager.java @@ -17,13 +17,18 @@ */ package org.apache.hadoop.ozone.web.localstorage; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.ozone.web.handlers.ListArgs; +import org.apache.hadoop.ozone.web.response.KeyInfo; +import org.apache.hadoop.ozone.web.response.ListKeys; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.web.exceptions.ErrorTable; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.handlers.BucketArgs; +import org.apache.hadoop.ozone.web.handlers.KeyArgs; import org.apache.hadoop.ozone.web.handlers.UserArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.request.OzoneAcl; @@ -34,9 +39,13 @@ import org.apache.hadoop.ozone.web.response.VolumeInfo; import org.apache.hadoop.ozone.web.response.VolumeOwner; import org.apache.hadoop.ozone.web.utils.OzoneConsts; import org.iq80.leveldb.DBException; +import org.apache.commons.codec.digest.DigestUtils; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.io.OutputStream; +import java.io.FileOutputStream; import java.nio.charset.Charset; import java.text.SimpleDateFormat; import java.util.Date; @@ -44,6 +53,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Locale; import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -111,7 +121,7 @@ public final class OzoneMetadataManager { // // stand-alone tests for the protocol and client code. */ - static final Log LOG = LogFactory.getLog(OzoneMetadataManager.class); + static final Logger LOG = LoggerFactory.getLogger(OzoneMetadataManager.class); private static final String USER_DB = "/user.db"; private static final String META_DB = "/metadata.db"; private static OzoneMetadataManager bm = null; @@ -119,28 +129,37 @@ public final class OzoneMetadataManager { private OzoneLevelDBStore metadataDB; private ReadWriteLock lock; private Charset encoding = Charset.forName("UTF-8"); + private String storageRoot; + private static final String OBJECT_DIR = "/_objects/"; + + // This table keeps a pointer to objects whose operations + // are in progress but not yet committed to persistent store + private ConcurrentHashMap inProgressObjects; /** * Constructs OzoneMetadataManager. */ - private OzoneMetadataManager(Configuration conf) { + private OzoneMetadataManager(Configuration conf) throws IOException { lock = new ReentrantReadWriteLock(); - String storageRoot = + storageRoot = conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); - File file = new File(storageRoot); + File file = new File(storageRoot + OBJECT_DIR); if (!file.exists() && !file.mkdirs()) { - LOG.fatal("Creation of Ozone root failed. " + file.toString()); + LOG.error("Creation of Ozone root failed. " + file.toString()); + throw new IOException("Creation of Ozone root failed."); } try { userDB = new OzoneLevelDBStore(new File(storageRoot + USER_DB), true); metadataDB = new OzoneLevelDBStore(new File(storageRoot + META_DB), true); + inProgressObjects = new ConcurrentHashMap<>(); } catch (IOException ex) { - LOG.fatal("Cannot open db :" + ex.getMessage()); + LOG.error("Cannot open db :" + ex.getMessage()); + throw ex; } } @@ -150,7 +169,7 @@ public final class OzoneMetadataManager { * @return OzoneMetadataManager */ public static synchronized OzoneMetadataManager - getOzoneMetadataManager(Configuration conf) { + getOzoneMetadataManager(Configuration conf) throws IOException { if (bm == null) { bm = new OzoneMetadataManager(conf); } @@ -440,8 +459,8 @@ public final class OzoneMetadataManager { if (args.getRemoveAcls() != null) { OzoneException ex = ErrorTable.newError(ErrorTable.MALFORMED_ACL, args); - ex.setMessage("Remove ACLs specified in bucket create. Please remove " + - "them and retry."); + ex.setMessage("Remove ACLs specified in bucket create. Please remove " + + "them and retry."); throw ex; } @@ -680,6 +699,249 @@ public final class OzoneMetadataManager { } } + /** + * Creates a key and returns a stream to which this key can be written to. + * @param args KeyArgs + * @return - A stream into which key can be written to. + * @throws OzoneException + */ + public OutputStream createKey(KeyArgs args) throws OzoneException { + lock.writeLock().lock(); + try { + String fileNameHash = DigestUtils.sha256Hex(args.getResourceName()); + + // Please don't try trillion objects unless the physical file system + // is capable of doing that in a single directory. + + String fullPath = storageRoot + OBJECT_DIR + fileNameHash; + File f = new File(fullPath); + + // In real ozone it would not be this way, a file will be overwritten + // only if the upload is successful. + if (f.exists()) { + LOG.debug("we are overwriting a file. This is by design."); + if(!f.delete()) { + LOG.error("Unable to delete the file: {}", fullPath); + throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args); + } + } + + // f.createNewFile(); + FileOutputStream fsStream = new FileOutputStream(f); + inProgressObjects.put(fsStream, fullPath); + + return fsStream; + } catch (IOException e) { + throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); + } finally { + lock.writeLock().unlock(); + } + } + + + /** + * commit keys moves an In progress object into the metadata store + * so that key is visible in the metadata operations from that point + * onwards. + * + * @param args Object args + * + * @throws OzoneException + */ + public void commitKey(KeyArgs args, OutputStream stream) + throws OzoneException { + SimpleDateFormat format = + new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US); + lock.writeLock().lock(); + + try { + byte[] bucketInfo = metadataDB.get(args.getParentName() + .getBytes(encoding)); + if (bucketInfo == null) { + throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); + } + BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding)); + bInfo.setKeyCount(bInfo.getKeyCount() + 1); + + String fileNameHash = inProgressObjects.get(stream); + inProgressObjects.remove(stream); + if (fileNameHash == null) { + throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args); + } + + ListKeys keyList; + byte[] bucketListBytes = userDB.get(args.getParentName() + .getBytes(encoding)); + if (bucketListBytes == null) { + keyList = new ListKeys(); + } else { + keyList = ListKeys.parse(new String(bucketListBytes, encoding)); + } + + KeyInfo keyInfo; + + byte[] objectBytes = metadataDB.get(args.getResourceName() + .getBytes(encoding)); + + if (objectBytes != null) { + // we are overwriting an existing object. + // TODO : Emit info for Accounting + keyInfo = KeyInfo.parse(new String(objectBytes, encoding)); + keyList.getKeyList().remove(keyInfo); + } else { + keyInfo = new KeyInfo(); + } + + keyInfo.setCreatedOn(format.format(new Date(System.currentTimeMillis()))); + + // TODO : support version, we need to check if versioning + // is switched on the bucket and make appropriate calls. + keyInfo.setVersion(0); + + keyInfo.setDataFileName(fileNameHash); + keyInfo.setKeyName(args.getKeyName()); + keyInfo.setMd5hash(args.getHash()); + keyInfo.setSize(args.getSize()); + + keyList.getKeyList().add(keyInfo); + + // if the key exists, we overwrite happily :). since the + // earlier call - createObject - has overwritten the data. + + metadataDB.put(args.getResourceName().getBytes(encoding), + keyInfo.toDBString().getBytes(encoding)); + + metadataDB.put(args.getParentName().getBytes(encoding), + bInfo.toDBString().getBytes(encoding)); + + userDB.put(args.getParentName().getBytes(encoding), + keyList.toDBString().getBytes(encoding)); + + } catch (IOException e) { + throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * deletes an key from a given bucket. + * + * @param args - ObjectArgs + * + * @throws OzoneException + */ + public void deleteKey(KeyArgs args) throws OzoneException { + lock.writeLock().lock(); + try { + byte[] bucketInfo = metadataDB.get(args.getParentName() + .getBytes(encoding)); + if (bucketInfo == null) { + throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); + } + BucketInfo bInfo = BucketInfo.parse(new String(bucketInfo, encoding)); + bInfo.setKeyCount(bInfo.getKeyCount() - 1); + + + byte[] bucketListBytes = userDB.get(args.getParentName() + .getBytes(encoding)); + if (bucketListBytes == null) { + throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); + } + ListKeys keyList = ListKeys.parse(new String(bucketListBytes, encoding)); + + + byte[] objectBytes = metadataDB.get(args.getResourceName() + .getBytes(encoding)); + if (objectBytes == null) { + throw ErrorTable.newError(ErrorTable.INVALID_KEY, args); + } + + KeyInfo oInfo = KeyInfo.parse(new String(objectBytes, encoding)); + keyList.getKeyList().remove(oInfo); + + String fileNameHash = DigestUtils.sha256Hex(args.getResourceName()); + + String fullPath = storageRoot + OBJECT_DIR + fileNameHash; + File f = new File(fullPath); + + if (f.exists()) { + if(!f.delete()) { + throw ErrorTable.newError(ErrorTable.KEY_OPERATION_CONFLICT, args); + } + } else { + throw ErrorTable.newError(ErrorTable.INVALID_KEY, args); + } + + + metadataDB.delete(args.getResourceName().getBytes(encoding)); + metadataDB.put(args.getParentName().getBytes(encoding), + bInfo.toDBString().getBytes(encoding)); + userDB.put(args.getParentName().getBytes(encoding), + keyList.toDBString().getBytes(encoding)); + } catch (IOException e) { + throw ErrorTable.newError(ErrorTable.SERVER_ERROR, args, e); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Returns a Stream for the file. + * + * @param args - Object args + * + * @return Stream + * + * @throws IOException + * @throws OzoneException + */ + public LengthInputStream newKeyReader(KeyArgs args) + throws IOException, OzoneException { + lock.readLock().lock(); + try { + String fileNameHash = DigestUtils.sha256Hex(args.getResourceName()); + String fullPath = storageRoot + OBJECT_DIR + fileNameHash; + File f = new File(fullPath); + if (!f.exists()) { + throw ErrorTable.newError(ErrorTable.INVALID_KEY, args); + } + long size = f.length(); + + FileInputStream fileStream = new FileInputStream(f); + return new LengthInputStream(fileStream, size); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Returns keys in a bucket. + * @param args + * @return List of keys. + * @throws IOException + * @throws OzoneException + */ + public ListKeys listKeys(ListArgs args) throws IOException, OzoneException { + lock.readLock().lock(); + try { + byte[] bucketInfo = metadataDB.get(args.getResourceName() + .getBytes(encoding)); + if (bucketInfo == null) { + throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); + } + + byte[] bucketListBytes = userDB.get(args.getResourceName() + .getBytes(encoding)); + if (bucketListBytes == null) { + throw ErrorTable.newError(ErrorTable.INVALID_BUCKET_NAME, args); + } + return ListKeys.parse(new String(bucketListBytes, encoding)); + } finally { + lock.readLock().unlock(); + } + } + /** * This is used in updates to volume metadata. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/response/ListKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/response/ListKeys.java index 8b0d9ccaaa8..d484e25c54e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/response/ListKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/response/ListKeys.java @@ -41,13 +41,13 @@ public class ListKeys { private String prefix; private long maxKeys; private boolean truncated; - private List objectList; + private List keyList; /** * Default constructor needed for json serialization. */ public ListKeys() { - this.objectList = new LinkedList<>(); + this.keyList = new LinkedList<>(); } /** @@ -65,9 +65,9 @@ public class ListKeys { /** * Converts a Json string to POJO. - * @param jsonString + * @param jsonString - json string. * @return ListObject - * @throws IOException + * @throws IOException - Json conversion error. */ public static ListKeys parse(String jsonString) throws IOException { ObjectMapper mapper = new ObjectMapper(); @@ -79,17 +79,17 @@ public class ListKeys { * * @return List of KeyInfo Objects. */ - public List getObjectList() { - return objectList; + public List getKeyList() { + return keyList; } /** * Sets the list of Objects. * - * @param objectList + * @param objectList - List of Keys */ - public void setObjectList(List objectList) { - this.objectList = objectList; + public void setKeyList(List objectList) { + this.keyList = objectList; } /** @@ -142,6 +142,7 @@ public class ListKeys { * keyCount. * * @return String + * @throws IOException - On json Errors. */ public String toJsonString() throws IOException { String[] ignorableFieldNames = {"dataFileName"}; @@ -159,6 +160,9 @@ public class ListKeys { /** * Returns the Object as a Json String. + * + * @return String + * @throws IOException - on json errors. */ public String toDBString() throws IOException { ObjectMapper mapper = new ObjectMapper(); @@ -170,7 +174,7 @@ public class ListKeys { * list of keys. */ public void sort() { - Collections.sort(objectList); + Collections.sort(keyList); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java index a8063901f62..8036770416d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/utils/OzoneUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.web.utils; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.ozone.web.exceptions.ErrorTable; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.handlers.UserArgs; @@ -27,7 +28,7 @@ import org.apache.hadoop.ozone.web.headers.Header; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Request; import javax.ws.rs.core.Response; -import java.io.InputStream; +import javax.ws.rs.core.MediaType; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.charset.Charset; @@ -45,7 +46,8 @@ import java.util.UUID; @InterfaceAudience.Private public final class OzoneUtils { - public static final Charset ENCODING = Charset.forName("UTF-8"); + public static final String ENCODING_NAME = "UTF-8"; + public static final Charset ENCODING = Charset.forName(ENCODING_NAME); private OzoneUtils() { // Never constructed @@ -256,15 +258,17 @@ public final class OzoneUtils { * @return JAX-RS Response */ public static Response getResponse(UserArgs args, int statusCode, - InputStream stream) { + LengthInputStream stream) { SimpleDateFormat format = new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US); format.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE)); String date = format.format(new Date(System.currentTimeMillis())); - return Response.ok(stream) + return Response.ok(stream, MediaType.APPLICATION_OCTET_STREAM) .header(Header.OZONE_SERVER_NAME, args.getHostName()) .header(Header.OZONE_REQUEST_ID, args.getRequestID()) .header(HttpHeaders.DATE, date).status(statusCode) - .header(HttpHeaders.CONTENT_TYPE, "application/octet-stream").build(); + .header(HttpHeaders.CONTENT_LENGTH, stream.getLength()) + .build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java index 3cab268702c..796f11f3913 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestBuckets.java @@ -59,7 +59,7 @@ public class TestBuckets { OzoneConfiguration conf = new OzoneConfiguration(); URL p = conf.getClass().getResource(""); - String path = p.getPath(); + String path = p.getPath().concat(TestBuckets.class.getSimpleName()); path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java new file mode 100644 index 00000000000..d025e1cf5a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.web.client; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.web.exceptions.ErrorTable; +import org.apache.hadoop.ozone.web.exceptions.OzoneException; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class TestKeys { + static MiniDFSCluster cluster = null; + static int port = 0; + static private String path; + private static OzoneClient client = null; + + + /** + * Create a MiniDFSCluster for testing. + * + * Ozone is made active by setting DFS_OBJECTSTORE_ENABLED_KEY = true and + * DFS_STORAGE_HANDLER_TYPE_KEY = "local" , which uses a local + * directory to emulate Ozone backend. + * + * @throws IOException + */ + @BeforeClass + public static void init() + throws IOException, OzoneException, URISyntaxException { + OzoneConfiguration conf = new OzoneConfiguration(); + + URL p = conf.getClass().getResource(""); + path = p.getPath().concat(TestKeys.class.getSimpleName()); + path += conf.getTrimmed(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, + OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT_DEFAULT); + + conf.set(OzoneConfigKeys.DFS_STORAGE_LOCAL_ROOT, path); + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_ENABLED_KEY, true); + conf.set(OzoneConfigKeys.DFS_STORAGE_HANDLER_TYPE_KEY, "local"); + + conf.setBoolean(OzoneConfigKeys.DFS_OBJECTSTORE_TRACE_ENABLED_KEY, true); + Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG); + + + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DataNode dataNode = cluster.getDataNodes().get(0); + port = dataNode.getInfoPort(); + client = new OzoneClient(String.format("http://localhost:%d", port)); + } + + /** + * shutdown MiniDFSCluster + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Creates a file with Random Data + * + * @return File. + */ + private File createRandomDataFile(String fileName, long size) { + File tmpDir = new File(path); + tmpDir.mkdirs(); + File tmpFile = new File(path + "/" + fileName); + try { + FileOutputStream randFile = new FileOutputStream(tmpFile); + Random r = new Random(); + for (int x = 0; x < size; x++) { + char c = (char) (r.nextInt(26) + 'a'); + randFile.write(c); + } + + } catch (IOException e) { + fail(e.getMessage()); + } + return tmpFile; + } + + + private class PutHelper { + OzoneVolume vol; + OzoneBucket bucket; + File file; + + public OzoneVolume getVol() { + return vol; + } + + public OzoneBucket getBucket() { + return bucket; + } + + public File getFile() { + return file; + } + /** + * This function is reused in all other tests. + * + * @return Returns the name of the new key that was created. + * @throws OzoneException + */ + private String putKey() throws + OzoneException { + String volumeName = OzoneUtils.getRequestID().toLowerCase(); + client.setUserAuth("hdfs"); + + vol = client.createVolume(volumeName, "bilbo", "100TB"); + String[] acls = {"user:frodo:rw", "user:samwise:rw"}; + + String bucketName = OzoneUtils.getRequestID().toLowerCase(); + bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT); + + String keyName = OzoneUtils.getRequestID().toLowerCase(); + file = createRandomDataFile(keyName, 1024); + + bucket.putKey(keyName, file); + return keyName; + } + + } + + @Test + public void testPutKey() throws OzoneException { + PutHelper helper = new PutHelper(); + helper.putKey(); + assertNotNull(helper.getBucket()); + assertNotNull(helper.getFile()); + } + + + @Test + public void testPutAndGetKey() throws OzoneException, IOException { + + PutHelper helper = new PutHelper(); + String keyName = helper.putKey(); + assertNotNull(helper.getBucket()); + assertNotNull(helper.getFile()); + + String newFileName = path + "/" +OzoneUtils.getRequestID().toLowerCase(); + Path newPath = Paths.get(newFileName); + helper.getBucket().getKey(keyName, newPath); + + FileInputStream original = new FileInputStream(helper.getFile()); + FileInputStream downloaded = new FileInputStream(newPath.toFile()); + + + String originalHash = DigestUtils.sha256Hex(original); + String downloadedHash = DigestUtils.sha256Hex(downloaded); + + assertEquals( + "Sha256 does not match between original file and downloaded file.", + originalHash, downloadedHash); + + } + + @Test + public void testPutAndDeleteKey() throws OzoneException, IOException { + + PutHelper helper = new PutHelper(); + String keyName = helper.putKey(); + assertNotNull(helper.getBucket()); + assertNotNull(helper.getFile()); + helper.getBucket().deleteKey(keyName); + + try { + helper.getBucket().getKey(keyName); + fail("Get Key on a deleted key should have thrown"); + } catch (OzoneException ex) { + assertEquals(ex.getShortMessage(), + ErrorTable.INVALID_KEY.getShortMessage()); + } + } + + + @Test + public void testPutAndListKey() throws OzoneException, IOException { + PutHelper helper = new PutHelper(); + helper.putKey(); + assertNotNull(helper.getBucket()); + assertNotNull(helper.getFile()); + + for (int x = 0; x < 10; x++) { + String newkeyName = OzoneUtils.getRequestID().toLowerCase(); + helper.getBucket().putKey(newkeyName, helper.getFile()); + } + + List keyList = helper.getBucket().listKeys(); + Assert.assertEquals(keyList.size(), 11); + } +}