diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java new file mode 100644 index 00000000000..a034ed3b53a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyArgs.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ksm.helpers; + +/** + * Args for key. Client use this to specify key's attributes on key creation + * (putKey()). + */ +public final class KsmKeyArgs { + private final String volumeName; + private final String bucketName; + private final String keyName; + + private final long dataSize; + + private KsmKeyArgs(String volumeName, String bucketName, String keyName, + long dataSize) { + this.volumeName = volumeName; + this.bucketName = bucketName; + this.keyName = keyName; + this.dataSize = dataSize; + } + + public String getVolumeName() { + return volumeName; + } + + public String getBucketName() { + return bucketName; + } + + public String getKeyName() { + return keyName; + } + + public long getDataSize() { + return dataSize; + } + + /** + * Builder class of KsmKeyArgs. + */ + public static class Builder { + private String volumeName; + private String bucketName; + private String keyName; + private long dataSize; + + public Builder setVolumeName(String volume) { + this.volumeName = volume; + return this; + } + + public Builder setBucketName(String bucket) { + this.bucketName = bucket; + return this; + } + + public Builder setKeyName(String key) { + this.keyName = key; + return this; + } + + public Builder setDataSize(long size) { + this.dataSize = size; + return this; + } + + public KsmKeyArgs build() { + return new KsmKeyArgs(volumeName, bucketName, keyName, dataSize); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java new file mode 100644 index 00000000000..76d22bed6fa --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ksm.helpers; + + +import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo; + +/** + * Args for key block. The block instance for the key requested in putKey. + * This is returned from KSM to client, and client use class to talk to + * datanode. Also, this is the metadata written to ksm.db on server side. + */ +public final class KsmKeyInfo { + private final String volumeName; + private final String bucketName; + // name of key client specified + private final String keyName; + private final String containerName; + // name of the block id SCM assigned for the key + private final String blockID; + private final long dataSize; + private final boolean shouldCreateContainer; + + private KsmKeyInfo(String volumeName, String bucketName, String keyName, + long dataSize, String blockID, String containerName, + boolean shouldCreateContainer) { + this.volumeName = volumeName; + this.bucketName = bucketName; + this.keyName = keyName; + this.containerName = containerName; + this.blockID = blockID; + this.dataSize = dataSize; + this.shouldCreateContainer = shouldCreateContainer; + } + + public String getVolumeName() { + return volumeName; + } + + public String getBucketName() { + return bucketName; + } + + public String getKeyName() { + return keyName; + } + + public String getBlockID() { + return blockID; + } + + public String getContainerName() { + return containerName; + } + + public long getDataSize() { + return dataSize; + } + + public boolean getShouldCreateContainer() { + return shouldCreateContainer; + } + + /** + * Builder of KsmKeyInfo. + */ + public static class Builder { + private String volumeName; + private String bucketName; + private String keyName; + private String containerName; + private String blockID; + private long dataSize; + private boolean shouldCreateContainer; + + public Builder setVolumeName(String volume) { + this.volumeName = volume; + return this; + } + + public Builder setBucketName(String bucket) { + this.bucketName = bucket; + return this; + } + + public Builder setKeyName(String key) { + this.keyName = key; + return this; + } + + public Builder setBlockID(String block) { + this.blockID = block; + return this; + } + + public Builder setContainerName(String container) { + this.containerName = container; + return this; + } + + public Builder setDataSize(long size) { + this.dataSize = size; + return this; + } + + public Builder setShouldCreateContainer(boolean create) { + this.shouldCreateContainer = create; + return this; + } + + public KsmKeyInfo build() { + return new KsmKeyInfo( + volumeName, bucketName, keyName, dataSize, blockID, containerName, + shouldCreateContainer); + } + } + + public KeyInfo getProtobuf() { + return KeyInfo.newBuilder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setDataSize(dataSize) + .setBlockKey(blockID) + .setContainerName(containerName) + .setShouldCreateContainer(shouldCreateContainer) + .build(); + } + + public static KsmKeyInfo getFromProtobuf(KeyInfo keyInfo) { + return new KsmKeyInfo( + keyInfo.getVolumeName(), + keyInfo.getBucketName(), + keyInfo.getKeyName(), + keyInfo.getDataSize(), + keyInfo.getBlockKey(), + keyInfo.getContainerName(), + keyInfo.getShouldCreateContainer()); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java index 8d52c01e2c7..4a759da6e9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocol/KeySpaceManagerProtocol.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ksm.protocol; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; import java.io.IOException; import java.util.List; @@ -113,4 +115,9 @@ public interface KeySpaceManagerProtocol { KsmBucketInfo getBucketInfo(String volumeName, String bucketName) throws IOException; + /** + * Allocate a block to a container, the block is returned to the client. + */ + KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java index d5bbd37b4f2..5fb28444866 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/protocolPB/KeySpaceManagerProtocolClientSideTranslatorPB.java @@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol; import org.apache.hadoop.ozone.protocol.proto @@ -39,6 +41,12 @@ import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.CreateVolumeRequest; import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.CreateVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto + .KeySpaceManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto + .KeySpaceManagerProtocolProtos.CreateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto + .KeySpaceManagerProtocolProtos.KeyArgs; import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest; import org.apache.hadoop.ozone.protocol.proto @@ -313,6 +321,36 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB } + /** + * Allocate a block for a key, then use the returned meta info to talk to data + * node to actually write the key. + * @param args the args for the key to be allocated + * @return a handler to the key, returned client + * @throws IOException + */ + @Override + public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException { + CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder(); + KeyArgs keyArgs = KeyArgs.newBuilder() + .setVolumeName(args.getVolumeName()) + .setBucketName(args.getBucketName()) + .setKeyName(args.getKeyName()) + .setDataSize(args.getDataSize()).build(); + req.setKeyArgs(keyArgs); + + final CreateKeyResponse resp; + try { + resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + if (resp.getStatus() != Status.OK) { + throw new IOException("Get key block failed, error:" + + resp.getStatus()); + } + return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo()); + } + /** * Return the proxy object underlying this protocol translator. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index d76565b428a..9553171be8b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -165,7 +165,7 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE = "ozone.scm.container.provision_batch_size"; - public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10; + public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto index a54ad40b7d3..a5d09e21d95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto @@ -210,6 +210,33 @@ message InfoBucketResponse { } + +message KeyArgs { + required string volumeName = 1; + required string bucketName = 2; + required string keyName = 3; + required uint64 dataSize = 4; +} + +message KeyInfo { + required string volumeName = 1; + required string bucketName = 2; + required string keyName = 3; + required uint64 dataSize = 4; + required string blockKey = 5; + required string containerName = 6; + required bool shouldCreateContainer = 7; +} + +message CreateKeyRequest { + required KeyArgs keyArgs = 1; +} + +message CreateKeyResponse { + required Status status = 1; + optional KeyInfo keyInfo = 2; +} + /** The KSM service that takes care of Ozone namespace. */ @@ -261,4 +288,10 @@ service KeySpaceManagerService { */ rpc infoBucket(InfoBucketRequest) returns(InfoBucketResponse); + + /** + Get key block. + */ + rpc createKey(CreateKeyRequest) + returns(CreateKeyResponse); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java index 7979b7f973e..37cbb64f0b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetrics.java @@ -33,6 +33,7 @@ public class KSMMetrics { private @Metric MutableCounterLong numVolumeInfos; private @Metric MutableCounterLong numBucketCreates; private @Metric MutableCounterLong numBucketInfos; + private @Metric MutableCounterLong numKeyBlockAllocate; // Failure Metrics private @Metric MutableCounterLong numVolumeCreateFails; @@ -40,6 +41,7 @@ public class KSMMetrics { private @Metric MutableCounterLong numVolumeInfoFails; private @Metric MutableCounterLong numBucketCreateFails; private @Metric MutableCounterLong numBucketInfoFails; + private @Metric MutableCounterLong numKeyBlockAllocateFails; public KSMMetrics() { } @@ -91,6 +93,14 @@ public class KSMMetrics { numBucketInfoFails.incr(); } + public void incNumKeyBlockAllocates() { + numKeyBlockAllocate.incr(); + } + + public void incNumKeyBlockAllocateFails() { + numKeyBlockAllocateFails.incr(); + } + @VisibleForTesting public long getNumVolumeCreates() { return numVolumeCreates.value(); @@ -141,4 +151,13 @@ public class KSMMetrics { return numBucketInfoFails.value(); } + @VisibleForTesting + public long getNumKeyBlockAllocates() { + return numKeyBlockAllocate.value(); + } + + @VisibleForTesting + public long getNumKeyBlockAllocateFailes() { + return numKeyBlockAllocateFails.value(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java new file mode 100644 index 00000000000..466de275156 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.ksm; + +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; + +import java.io.IOException; + +/** + * Handles key level commands. + */ +public interface KeyManager { + /** + * Given the args of a key to put, return a pipeline for the key. Writes + * the key to pipeline mapping to meta data. + * + * Note that this call only allocate a block for key, and adds the + * corresponding entry to metadata. The block will be returned to client side + * handler DistributedStorageHandler. Which will make another call to + * datanode to create container (if needed) and writes the key. + * + * In case that the container creation or key write failed on + * DistributedStorageHandler, this key's metadata will still stay in KSM. + * + * @param args the args of the key provided by client. + * @return a KsmKeyInfo instance client uses to talk to container. + * @throws Exception + */ + KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java new file mode 100644 index 00000000000..ee0674590cb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.ksm; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; +import org.apache.hadoop.ozone.ksm.exceptions.KSMException; +import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; +import org.iq80.leveldb.DBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Implementation of keyManager. + */ +public class KeyManagerImpl implements KeyManager { + private static final Logger LOG = + LoggerFactory.getLogger(KeyManagerImpl.class); + + /** + * A SCM block client, used to talk to SCM to allocate block during putKey. + */ + private final ScmBlockLocationProtocol scmBlockClient; + private final MetadataManager metadataManager; + + public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient, + MetadataManager metadataManager) { + this.scmBlockClient = scmBlockClient; + this.metadataManager = metadataManager; + } + + @Override + public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException { + Preconditions.checkNotNull(args); + metadataManager.writeLock().lock(); + String volumeName = args.getVolumeName(); + String bucketName = args.getBucketName(); + String keyName = args.getKeyName(); + try { + byte[] volumeKey = metadataManager.getVolumeKey(volumeName); + byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName); + byte[] keyKey = metadataManager.getDBKeyForKey( + volumeName, bucketName, keyName); + + //Check if the volume exists + if(metadataManager.get(volumeKey) == null) { + LOG.error("volume not found: {}", volumeName); + throw new KSMException("Volume not found", + KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND); + } + //Check if bucket already exists + if(metadataManager.get(bucketKey) == null) { + LOG.error("bucket not found: {}/{} ", volumeName, bucketName); + throw new KSMException("Bucket not found", + KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND); + } + // TODO throw exception if key exists, may change to support key + // overwrite in the future + //Check if key already exists. + if(metadataManager.get(keyKey) != null) { + LOG.error("key already exist: {}/{}/{} ", volumeName, bucketName, + keyName); + throw new KSMException("Key already exist", + KSMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS); + } + + AllocatedBlock allocatedBlock = + scmBlockClient.allocateBlock(args.getDataSize()); + KsmKeyInfo keyBlock = new KsmKeyInfo.Builder() + .setVolumeName(args.getVolumeName()) + .setBucketName(args.getBucketName()) + .setKeyName(args.getKeyName()) + .setDataSize(args.getDataSize()) + .setBlockID(allocatedBlock.getKey()) + .setContainerName(allocatedBlock.getPipeline().getContainerName()) + .setShouldCreateContainer(allocatedBlock.getCreateContainer()) + .build(); + metadataManager.put(keyKey, keyBlock.getProtobuf().toByteArray()); + LOG.debug("Key {} allocated in volume {} bucket {}", + keyName, volumeName, bucketName); + return keyBlock; + } catch (DBException ex) { + LOG.error("Key allocation failed for volume:{} bucket:{} key:{}", + volumeName, bucketName, keyName, ex); + throw new KSMException(ex.getMessage(), + KSMException.ResultCodes.FAILED_INTERNAL_ERROR); + } finally { + metadataManager.writeLock().unlock(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java index ba3d2c3ef83..6682301e1eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java @@ -20,17 +20,25 @@ package org.apache.hadoop.ozone.ksm; import com.google.protobuf.BlockingService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol; import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ozone.OzoneClientUtils; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.protocolPB .KeySpaceManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.scm.StorageContainerManager; +import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +71,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol { private final MetadataManager metadataManager; private final VolumeManager volumeManager; private final BucketManager bucketManager; + private final KeyManager keyManager; private final KSMMetrics metrics; public KeySpaceManager(OzoneConfiguration conf) throws IOException { @@ -85,6 +94,31 @@ public class KeySpaceManager implements KeySpaceManagerProtocol { volumeManager = new VolumeManagerImpl(metadataManager, conf); bucketManager = new BucketManagerImpl(metadataManager); metrics = KSMMetrics.create(); + keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager); + } + + /** + * Create a scm block client, used by putKey() and getKey(). + * + * @param conf + * @return + * @throws IOException + */ + private ScmBlockLocationProtocol getScmBlockClient(OzoneConfiguration conf) + throws IOException { + RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, + ProtobufRpcEngine.class); + long scmVersion = + RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); + InetSocketAddress scmBlockAddress = + OzoneClientUtils.getScmAddressForBlockClients(conf); + ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = + new ScmBlockLocationProtocolClientSideTranslatorPB( + RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion, + scmBlockAddress, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), + Client.getRpcTimeout(conf))); + return scmBlockLocationClient; } /** @@ -373,4 +407,20 @@ public class KeySpaceManager implements KeySpaceManagerProtocol { } } + /** + * Allocate a key block. + * @param args - attributes of the key. + * @return + * @throws IOException + */ + @Override + public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException { + try { + metrics.incNumKeyBlockAllocates(); + return keyManager.allocateKey(args); + } catch (Exception ex) { + metrics.incNumKeyBlockAllocateFails(); + throw ex; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java index 407d46ab973..78d0193fe4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManager.java @@ -95,4 +95,13 @@ public interface MetadataManager { * @param bucket - Bucket name */ byte[] getBucketKey(String volume, String bucket); + + /** + * Given a volume, bucket and a key, return the corresponding DB key. + * @param volume - volume name + * @param bucket - bucket name + * @param key - key name + * @return bytes of DB key. + */ + byte[] getDBKeyForKey(String volume, String bucket, String key); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java index 0a91bc0b65b..fdd035a2230 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/MetadataManagerImpl.java @@ -105,6 +105,14 @@ public class MetadataManagerImpl implements MetadataManager { return DFSUtil.string2Bytes(bucketKeyString); } + @Override + public byte[] getDBKeyForKey(String volume, String bucket, String key) { + String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume + + OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX + + key; + return DFSUtil.string2Bytes(keyKeyString); + } + /** * Returns the read lock used on Metadata DB. * @return readLock diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java index 63c42e807a3..0deb7d6ad2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java @@ -102,6 +102,7 @@ public class KSMException extends IOException { FAILED_USER_NOT_FOUND, FAILED_BUCKET_ALREADY_EXISTS, FAILED_BUCKET_NOT_FOUND, + FAILED_KEY_ALREADY_EXISTS, FAILED_INTERNAL_ERROR } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java index 33c4af4df61..634245c856c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/KeySpaceManagerProtocolServerSideTranslatorPB.java @@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol; import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB; @@ -35,6 +37,12 @@ import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.CreateVolumeRequest; import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.CreateVolumeResponse; +import org.apache.hadoop.ozone.protocol.proto + .KeySpaceManagerProtocolProtos.CreateKeyRequest; +import org.apache.hadoop.ozone.protocol.proto + .KeySpaceManagerProtocolProtos.CreateKeyResponse; +import org.apache.hadoop.ozone.protocol.proto + .KeySpaceManagerProtocolProtos.KeyArgs; import org.apache.hadoop.ozone.protocol.proto .KeySpaceManagerProtocolProtos.SetVolumePropertyRequest; import org.apache.hadoop.ozone.protocol.proto @@ -211,4 +219,27 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements } return resp.build(); } + + @Override + public CreateKeyResponse createKey( + RpcController controller, CreateKeyRequest request + ) throws ServiceException { + CreateKeyResponse.Builder resp = + CreateKeyResponse.newBuilder(); + try { + KeyArgs keyArgs = request.getKeyArgs(); + KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder() + .setVolumeName(keyArgs.getVolumeName()) + .setBucketName(keyArgs.getBucketName()) + .setKeyName(keyArgs.getKeyName()) + .setDataSize(keyArgs.getDataSize()) + .build(); + KsmKeyInfo keyInfo = impl.allocateKey(ksmKeyArgs); + resp.setKeyInfo(keyInfo.getProtobuf()); + resp.setStatus(Status.OK); + } catch (IOException e) { + resp.setStatus(exceptionToResponseStatus(e)); + } + return resp.build(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java index 3e36593006d..80027db44a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java @@ -202,7 +202,7 @@ public class BlockManagerImpl implements BlockManager { */ @Override public AllocatedBlock allocateBlock(final long size) throws IOException { - boolean createContainer = false; + boolean createContainer; Pipeline pipeline; if (size < 0 || size > containerSize) { throw new SCMException("Unsupported block size", @@ -223,11 +223,13 @@ public class BlockManagerImpl implements BlockManager { throw new SCMException("Unable to allocate container for the block", FAILED_TO_ALLOCATE_CONTAINER); } + createContainer = true; } else { candidates = openContainers.entrySet().parallelStream() .filter(e -> (e.getValue() + size < containerSize)) .map(e -> e.getKey()) .collect(Collectors.toList()); + createContainer = false; } if (candidates.size() == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index a7d1fdcdadf..5b89b166c3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.datanode.fsdataset .LengthInputStream; import org.apache.hadoop.ksm.helpers.KsmBucketInfo; +import org.apache.hadoop.ksm.helpers.KsmKeyArgs; +import org.apache.hadoop.ksm.helpers.KsmKeyInfo; import org.apache.hadoop.ksm.helpers.KsmVolumeArgs; import org.apache.hadoop.ksm.protocolPB .KeySpaceManagerProtocolClientSideTranslatorPB; @@ -53,6 +55,7 @@ import org.apache.hadoop.ozone.web.response.*; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ChunkOutputStream; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,15 +282,29 @@ public final class DistributedStorageHandler implements StorageHandler { @Override public OutputStream newKeyWriter(KeyArgs args) throws IOException, OzoneException { + KsmKeyArgs keyArgs = new KsmKeyArgs.Builder() + .setVolumeName(args.getVolumeName()) + .setBucketName(args.getBucketName()) + .setKeyName(args.getKeyName()) + .setDataSize(args.getSize()) + .build(); + // contact KSM to allocate a block for key. String containerKey = buildContainerKey(args.getVolumeName(), args.getBucketName(), args.getKeyName()); - KeyInfo key = new KeyInfo(); - key.setKeyName(args.getKeyName()); - key.setCreatedOn(dateToString(new Date())); - XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); - return new ChunkOutputStream(containerKey, key.getKeyName(), - xceiverClientManager, xceiverClient, args.getRequestID(), - chunkSize); + KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs); + // TODO the following createContainer and key writes may fail, in which + // case we should revert the above allocateKey to KSM. + String containerName = keyInfo.getContainerName(); + XceiverClientSpi xceiverClient = getContainer(containerName); + if (keyInfo.getShouldCreateContainer()) { + LOG.debug("Need to create container {} for key: {}/{}/{}", containerName, + args.getVolumeName(), args.getBucketName(), args.getKeyName()); + ContainerProtocolCalls.createContainer( + xceiverClient, args.getRequestID()); + } + // establish a connection to the container to write the key + return new ChunkOutputStream(containerKey, args.getKeyName(), + xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize); } @Override @@ -334,6 +351,13 @@ public final class DistributedStorageHandler implements StorageHandler { throw new UnsupportedOperationException("listKeys not implemented"); } + private XceiverClientSpi getContainer(String containerName) + throws IOException { + Pipeline pipeline = + storageContainerLocationClient.getContainer(containerName); + return xceiverClientManager.acquireClient(pipeline); + } + /** * Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline} * of nodes capable of serving container protocol operations. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java index db96ca7ce50..df702ad432d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm; import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.OzoneConfigKeys; @@ -25,12 +26,14 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.web.exceptions.OzoneException; import org.apache.hadoop.ozone.web.handlers.BucketArgs; +import org.apache.hadoop.ozone.web.handlers.KeyArgs; import org.apache.hadoop.ozone.web.handlers.UserArgs; import org.apache.hadoop.ozone.web.handlers.VolumeArgs; import org.apache.hadoop.ozone.web.interfaces.StorageHandler; import org.apache.hadoop.ozone.web.request.OzoneQuota; import org.apache.hadoop.ozone.web.response.BucketInfo; import org.apache.hadoop.ozone.web.response.VolumeInfo; +import org.apache.hadoop.ozone.web.utils.OzoneUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -38,6 +41,8 @@ import org.junit.Test; import java.io.IOException; +import java.io.OutputStream; +import java.util.LinkedList; import java.util.Random; /** @@ -65,7 +70,8 @@ public class TestKeySpaceManager { cluster = new MiniOzoneCluster.Builder(conf) .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); storageHandler = new ObjectStoreHandler(conf).getStorageHandler(); - userArgs = new UserArgs(null, null, null, null, null, null); + userArgs = new UserArgs(null, OzoneUtils.getRequestID(), + null, null, null, null); ksmMetrics = cluster.getKeySpaceManager().getMetrics(); } @@ -190,4 +196,33 @@ public class TestKeySpaceManager { Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails()); Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails()); } + + @Test + public void testGetKeyWriter() throws IOException, OzoneException { + String userName = "user" + RandomStringUtils.randomNumeric(5); + String adminName = "admin" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + String bucketName = "bucket" + RandomStringUtils.randomNumeric(5); + String keyName = "key" + RandomStringUtils.randomNumeric(5); + Assert.assertEquals(0, ksmMetrics.getNumKeyBlockAllocates()); + + VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs); + createVolumeArgs.setUserName(userName); + createVolumeArgs.setAdminName(adminName); + storageHandler.createVolume(createVolumeArgs); + + BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs); + bucketArgs.setAddAcls(new LinkedList<>()); + bucketArgs.setRemoveAcls(new LinkedList<>()); + bucketArgs.setStorageType(StorageType.DISK); + storageHandler.createBucket(bucketArgs); + + String dataString = RandomStringUtils.randomAscii(100); + KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs); + keyArgs.setSize(4096); + try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) { + stream.write(dataString.getBytes()); + } + Assert.assertEquals(1, ksmMetrics.getNumKeyBlockAllocates()); + } }