HDFS-11780. Ozone: KSM: Add putKey. Contributed by Chen Liang.
This commit is contained in:
parent
ff5dbeec07
commit
e3c8f6a24d
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ksm.helpers;
|
||||
|
||||
/**
|
||||
* Args for key. Client use this to specify key's attributes on key creation
|
||||
* (putKey()).
|
||||
*/
|
||||
public final class KsmKeyArgs {
|
||||
private final String volumeName;
|
||||
private final String bucketName;
|
||||
private final String keyName;
|
||||
|
||||
private final long dataSize;
|
||||
|
||||
private KsmKeyArgs(String volumeName, String bucketName, String keyName,
|
||||
long dataSize) {
|
||||
this.volumeName = volumeName;
|
||||
this.bucketName = bucketName;
|
||||
this.keyName = keyName;
|
||||
this.dataSize = dataSize;
|
||||
}
|
||||
|
||||
public String getVolumeName() {
|
||||
return volumeName;
|
||||
}
|
||||
|
||||
public String getBucketName() {
|
||||
return bucketName;
|
||||
}
|
||||
|
||||
public String getKeyName() {
|
||||
return keyName;
|
||||
}
|
||||
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder class of KsmKeyArgs.
|
||||
*/
|
||||
public static class Builder {
|
||||
private String volumeName;
|
||||
private String bucketName;
|
||||
private String keyName;
|
||||
private long dataSize;
|
||||
|
||||
public Builder setVolumeName(String volume) {
|
||||
this.volumeName = volume;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setBucketName(String bucket) {
|
||||
this.bucketName = bucket;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setKeyName(String key) {
|
||||
this.keyName = key;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setDataSize(long size) {
|
||||
this.dataSize = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
public KsmKeyArgs build() {
|
||||
return new KsmKeyArgs(volumeName, bucketName, keyName, dataSize);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ksm.helpers;
|
||||
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
|
||||
|
||||
/**
|
||||
* Args for key block. The block instance for the key requested in putKey.
|
||||
* This is returned from KSM to client, and client use class to talk to
|
||||
* datanode. Also, this is the metadata written to ksm.db on server side.
|
||||
*/
|
||||
public final class KsmKeyInfo {
|
||||
private final String volumeName;
|
||||
private final String bucketName;
|
||||
// name of key client specified
|
||||
private final String keyName;
|
||||
private final String containerName;
|
||||
// name of the block id SCM assigned for the key
|
||||
private final String blockID;
|
||||
private final long dataSize;
|
||||
private final boolean shouldCreateContainer;
|
||||
|
||||
private KsmKeyInfo(String volumeName, String bucketName, String keyName,
|
||||
long dataSize, String blockID, String containerName,
|
||||
boolean shouldCreateContainer) {
|
||||
this.volumeName = volumeName;
|
||||
this.bucketName = bucketName;
|
||||
this.keyName = keyName;
|
||||
this.containerName = containerName;
|
||||
this.blockID = blockID;
|
||||
this.dataSize = dataSize;
|
||||
this.shouldCreateContainer = shouldCreateContainer;
|
||||
}
|
||||
|
||||
public String getVolumeName() {
|
||||
return volumeName;
|
||||
}
|
||||
|
||||
public String getBucketName() {
|
||||
return bucketName;
|
||||
}
|
||||
|
||||
public String getKeyName() {
|
||||
return keyName;
|
||||
}
|
||||
|
||||
public String getBlockID() {
|
||||
return blockID;
|
||||
}
|
||||
|
||||
public String getContainerName() {
|
||||
return containerName;
|
||||
}
|
||||
|
||||
public long getDataSize() {
|
||||
return dataSize;
|
||||
}
|
||||
|
||||
public boolean getShouldCreateContainer() {
|
||||
return shouldCreateContainer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builder of KsmKeyInfo.
|
||||
*/
|
||||
public static class Builder {
|
||||
private String volumeName;
|
||||
private String bucketName;
|
||||
private String keyName;
|
||||
private String containerName;
|
||||
private String blockID;
|
||||
private long dataSize;
|
||||
private boolean shouldCreateContainer;
|
||||
|
||||
public Builder setVolumeName(String volume) {
|
||||
this.volumeName = volume;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setBucketName(String bucket) {
|
||||
this.bucketName = bucket;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setKeyName(String key) {
|
||||
this.keyName = key;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setBlockID(String block) {
|
||||
this.blockID = block;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setContainerName(String container) {
|
||||
this.containerName = container;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setDataSize(long size) {
|
||||
this.dataSize = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setShouldCreateContainer(boolean create) {
|
||||
this.shouldCreateContainer = create;
|
||||
return this;
|
||||
}
|
||||
|
||||
public KsmKeyInfo build() {
|
||||
return new KsmKeyInfo(
|
||||
volumeName, bucketName, keyName, dataSize, blockID, containerName,
|
||||
shouldCreateContainer);
|
||||
}
|
||||
}
|
||||
|
||||
public KeyInfo getProtobuf() {
|
||||
return KeyInfo.newBuilder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(dataSize)
|
||||
.setBlockKey(blockID)
|
||||
.setContainerName(containerName)
|
||||
.setShouldCreateContainer(shouldCreateContainer)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static KsmKeyInfo getFromProtobuf(KeyInfo keyInfo) {
|
||||
return new KsmKeyInfo(
|
||||
keyInfo.getVolumeName(),
|
||||
keyInfo.getBucketName(),
|
||||
keyInfo.getKeyName(),
|
||||
keyInfo.getDataSize(),
|
||||
keyInfo.getBlockKey(),
|
||||
keyInfo.getContainerName(),
|
||||
keyInfo.getShouldCreateContainer());
|
||||
}
|
||||
|
||||
}
|
|
@ -18,6 +18,8 @@
|
|||
package org.apache.hadoop.ksm.protocol;
|
||||
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -113,4 +115,9 @@ public interface KeySpaceManagerProtocol {
|
|||
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Allocate a block to a container, the block is returned to the client.
|
||||
*/
|
||||
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
|
@ -39,6 +41,12 @@ import org.apache.hadoop.ozone.protocol.proto
|
|||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.CreateKeyRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.CreateKeyResponse;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.KeyArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
|
@ -313,6 +321,36 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Allocate a block for a key, then use the returned meta info to talk to data
|
||||
* node to actually write the key.
|
||||
* @param args the args for the key to be allocated
|
||||
* @return a handler to the key, returned client
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
|
||||
CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
|
||||
KeyArgs keyArgs = KeyArgs.newBuilder()
|
||||
.setVolumeName(args.getVolumeName())
|
||||
.setBucketName(args.getBucketName())
|
||||
.setKeyName(args.getKeyName())
|
||||
.setDataSize(args.getDataSize()).build();
|
||||
req.setKeyArgs(keyArgs);
|
||||
|
||||
final CreateKeyResponse resp;
|
||||
try {
|
||||
resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
if (resp.getStatus() != Status.OK) {
|
||||
throw new IOException("Get key block failed, error:" +
|
||||
resp.getStatus());
|
||||
}
|
||||
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the proxy object underlying this protocol translator.
|
||||
*
|
||||
|
|
|
@ -165,7 +165,7 @@ public final class ScmConfigKeys {
|
|||
|
||||
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
|
||||
"ozone.scm.container.provision_batch_size";
|
||||
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10;
|
||||
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -210,6 +210,33 @@ message InfoBucketResponse {
|
|||
|
||||
}
|
||||
|
||||
|
||||
message KeyArgs {
|
||||
required string volumeName = 1;
|
||||
required string bucketName = 2;
|
||||
required string keyName = 3;
|
||||
required uint64 dataSize = 4;
|
||||
}
|
||||
|
||||
message KeyInfo {
|
||||
required string volumeName = 1;
|
||||
required string bucketName = 2;
|
||||
required string keyName = 3;
|
||||
required uint64 dataSize = 4;
|
||||
required string blockKey = 5;
|
||||
required string containerName = 6;
|
||||
required bool shouldCreateContainer = 7;
|
||||
}
|
||||
|
||||
message CreateKeyRequest {
|
||||
required KeyArgs keyArgs = 1;
|
||||
}
|
||||
|
||||
message CreateKeyResponse {
|
||||
required Status status = 1;
|
||||
optional KeyInfo keyInfo = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
The KSM service that takes care of Ozone namespace.
|
||||
*/
|
||||
|
@ -261,4 +288,10 @@ service KeySpaceManagerService {
|
|||
*/
|
||||
rpc infoBucket(InfoBucketRequest)
|
||||
returns(InfoBucketResponse);
|
||||
|
||||
/**
|
||||
Get key block.
|
||||
*/
|
||||
rpc createKey(CreateKeyRequest)
|
||||
returns(CreateKeyResponse);
|
||||
}
|
|
@ -33,6 +33,7 @@ public class KSMMetrics {
|
|||
private @Metric MutableCounterLong numVolumeInfos;
|
||||
private @Metric MutableCounterLong numBucketCreates;
|
||||
private @Metric MutableCounterLong numBucketInfos;
|
||||
private @Metric MutableCounterLong numKeyBlockAllocate;
|
||||
|
||||
// Failure Metrics
|
||||
private @Metric MutableCounterLong numVolumeCreateFails;
|
||||
|
@ -40,6 +41,7 @@ public class KSMMetrics {
|
|||
private @Metric MutableCounterLong numVolumeInfoFails;
|
||||
private @Metric MutableCounterLong numBucketCreateFails;
|
||||
private @Metric MutableCounterLong numBucketInfoFails;
|
||||
private @Metric MutableCounterLong numKeyBlockAllocateFails;
|
||||
|
||||
public KSMMetrics() {
|
||||
}
|
||||
|
@ -91,6 +93,14 @@ public class KSMMetrics {
|
|||
numBucketInfoFails.incr();
|
||||
}
|
||||
|
||||
public void incNumKeyBlockAllocates() {
|
||||
numKeyBlockAllocate.incr();
|
||||
}
|
||||
|
||||
public void incNumKeyBlockAllocateFails() {
|
||||
numKeyBlockAllocateFails.incr();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumVolumeCreates() {
|
||||
return numVolumeCreates.value();
|
||||
|
@ -141,4 +151,13 @@ public class KSMMetrics {
|
|||
return numBucketInfoFails.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumKeyBlockAllocates() {
|
||||
return numKeyBlockAllocate.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumKeyBlockAllocateFailes() {
|
||||
return numKeyBlockAllocateFails.value();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Handles key level commands.
|
||||
*/
|
||||
public interface KeyManager {
|
||||
/**
|
||||
* Given the args of a key to put, return a pipeline for the key. Writes
|
||||
* the key to pipeline mapping to meta data.
|
||||
*
|
||||
* Note that this call only allocate a block for key, and adds the
|
||||
* corresponding entry to metadata. The block will be returned to client side
|
||||
* handler DistributedStorageHandler. Which will make another call to
|
||||
* datanode to create container (if needed) and writes the key.
|
||||
*
|
||||
* In case that the container creation or key write failed on
|
||||
* DistributedStorageHandler, this key's metadata will still stay in KSM.
|
||||
*
|
||||
* @param args the args of the key provided by client.
|
||||
* @return a KsmKeyInfo instance client uses to talk to container.
|
||||
* @throws Exception
|
||||
*/
|
||||
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.ksm;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.iq80.leveldb.DBException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Implementation of keyManager.
|
||||
*/
|
||||
public class KeyManagerImpl implements KeyManager {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(KeyManagerImpl.class);
|
||||
|
||||
/**
|
||||
* A SCM block client, used to talk to SCM to allocate block during putKey.
|
||||
*/
|
||||
private final ScmBlockLocationProtocol scmBlockClient;
|
||||
private final MetadataManager metadataManager;
|
||||
|
||||
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
||||
MetadataManager metadataManager) {
|
||||
this.scmBlockClient = scmBlockClient;
|
||||
this.metadataManager = metadataManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
|
||||
Preconditions.checkNotNull(args);
|
||||
metadataManager.writeLock().lock();
|
||||
String volumeName = args.getVolumeName();
|
||||
String bucketName = args.getBucketName();
|
||||
String keyName = args.getKeyName();
|
||||
try {
|
||||
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
|
||||
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
byte[] keyKey = metadataManager.getDBKeyForKey(
|
||||
volumeName, bucketName, keyName);
|
||||
|
||||
//Check if the volume exists
|
||||
if(metadataManager.get(volumeKey) == null) {
|
||||
LOG.error("volume not found: {}", volumeName);
|
||||
throw new KSMException("Volume not found",
|
||||
KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
//Check if bucket already exists
|
||||
if(metadataManager.get(bucketKey) == null) {
|
||||
LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
|
||||
throw new KSMException("Bucket not found",
|
||||
KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||
}
|
||||
// TODO throw exception if key exists, may change to support key
|
||||
// overwrite in the future
|
||||
//Check if key already exists.
|
||||
if(metadataManager.get(keyKey) != null) {
|
||||
LOG.error("key already exist: {}/{}/{} ", volumeName, bucketName,
|
||||
keyName);
|
||||
throw new KSMException("Key already exist",
|
||||
KSMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
AllocatedBlock allocatedBlock =
|
||||
scmBlockClient.allocateBlock(args.getDataSize());
|
||||
KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
|
||||
.setVolumeName(args.getVolumeName())
|
||||
.setBucketName(args.getBucketName())
|
||||
.setKeyName(args.getKeyName())
|
||||
.setDataSize(args.getDataSize())
|
||||
.setBlockID(allocatedBlock.getKey())
|
||||
.setContainerName(allocatedBlock.getPipeline().getContainerName())
|
||||
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
|
||||
.build();
|
||||
metadataManager.put(keyKey, keyBlock.getProtobuf().toByteArray());
|
||||
LOG.debug("Key {} allocated in volume {} bucket {}",
|
||||
keyName, volumeName, bucketName);
|
||||
return keyBlock;
|
||||
} catch (DBException ex) {
|
||||
LOG.error("Key allocation failed for volume:{} bucket:{} key:{}",
|
||||
volumeName, bucketName, keyName, ex);
|
||||
throw new KSMException(ex.getMessage(),
|
||||
KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
|
||||
} finally {
|
||||
metadataManager.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,17 +20,25 @@ package org.apache.hadoop.ozone.ksm;
|
|||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.protocolPB
|
||||
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -63,6 +71,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
|||
private final MetadataManager metadataManager;
|
||||
private final VolumeManager volumeManager;
|
||||
private final BucketManager bucketManager;
|
||||
private final KeyManager keyManager;
|
||||
private final KSMMetrics metrics;
|
||||
|
||||
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
||||
|
@ -85,6 +94,31 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
|||
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
||||
bucketManager = new BucketManagerImpl(metadataManager);
|
||||
metrics = KSMMetrics.create();
|
||||
keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a scm block client, used by putKey() and getKey().
|
||||
*
|
||||
* @param conf
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private ScmBlockLocationProtocol getScmBlockClient(OzoneConfiguration conf)
|
||||
throws IOException {
|
||||
RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
|
||||
ProtobufRpcEngine.class);
|
||||
long scmVersion =
|
||||
RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
|
||||
InetSocketAddress scmBlockAddress =
|
||||
OzoneClientUtils.getScmAddressForBlockClients(conf);
|
||||
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
|
||||
new ScmBlockLocationProtocolClientSideTranslatorPB(
|
||||
RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
|
||||
scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
|
||||
NetUtils.getDefaultSocketFactory(conf),
|
||||
Client.getRpcTimeout(conf)));
|
||||
return scmBlockLocationClient;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -373,4 +407,20 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a key block.
|
||||
* @param args - attributes of the key.
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
|
||||
try {
|
||||
metrics.incNumKeyBlockAllocates();
|
||||
return keyManager.allocateKey(args);
|
||||
} catch (Exception ex) {
|
||||
metrics.incNumKeyBlockAllocateFails();
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,4 +95,13 @@ public interface MetadataManager {
|
|||
* @param bucket - Bucket name
|
||||
*/
|
||||
byte[] getBucketKey(String volume, String bucket);
|
||||
|
||||
/**
|
||||
* Given a volume, bucket and a key, return the corresponding DB key.
|
||||
* @param volume - volume name
|
||||
* @param bucket - bucket name
|
||||
* @param key - key name
|
||||
* @return bytes of DB key.
|
||||
*/
|
||||
byte[] getDBKeyForKey(String volume, String bucket, String key);
|
||||
}
|
||||
|
|
|
@ -105,6 +105,14 @@ public class MetadataManagerImpl implements MetadataManager {
|
|||
return DFSUtil.string2Bytes(bucketKeyString);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
||||
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
||||
+ OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
|
||||
+ key;
|
||||
return DFSUtil.string2Bytes(keyKeyString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the read lock used on Metadata DB.
|
||||
* @return readLock
|
||||
|
|
|
@ -102,6 +102,7 @@ public class KSMException extends IOException {
|
|||
FAILED_USER_NOT_FOUND,
|
||||
FAILED_BUCKET_ALREADY_EXISTS,
|
||||
FAILED_BUCKET_NOT_FOUND,
|
||||
FAILED_KEY_ALREADY_EXISTS,
|
||||
FAILED_INTERNAL_ERROR
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB;
|
|||
import com.google.protobuf.RpcController;
|
||||
import com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||
|
@ -35,6 +37,12 @@ import org.apache.hadoop.ozone.protocol.proto
|
|||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.CreateKeyRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.CreateKeyResponse;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.KeyArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
|
@ -211,4 +219,27 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
return resp.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public CreateKeyResponse createKey(
|
||||
RpcController controller, CreateKeyRequest request
|
||||
) throws ServiceException {
|
||||
CreateKeyResponse.Builder resp =
|
||||
CreateKeyResponse.newBuilder();
|
||||
try {
|
||||
KeyArgs keyArgs = request.getKeyArgs();
|
||||
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
|
||||
.setVolumeName(keyArgs.getVolumeName())
|
||||
.setBucketName(keyArgs.getBucketName())
|
||||
.setKeyName(keyArgs.getKeyName())
|
||||
.setDataSize(keyArgs.getDataSize())
|
||||
.build();
|
||||
KsmKeyInfo keyInfo = impl.allocateKey(ksmKeyArgs);
|
||||
resp.setKeyInfo(keyInfo.getProtobuf());
|
||||
resp.setStatus(Status.OK);
|
||||
} catch (IOException e) {
|
||||
resp.setStatus(exceptionToResponseStatus(e));
|
||||
}
|
||||
return resp.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,7 +202,7 @@ public class BlockManagerImpl implements BlockManager {
|
|||
*/
|
||||
@Override
|
||||
public AllocatedBlock allocateBlock(final long size) throws IOException {
|
||||
boolean createContainer = false;
|
||||
boolean createContainer;
|
||||
Pipeline pipeline;
|
||||
if (size < 0 || size > containerSize) {
|
||||
throw new SCMException("Unsupported block size",
|
||||
|
@ -223,11 +223,13 @@ public class BlockManagerImpl implements BlockManager {
|
|||
throw new SCMException("Unable to allocate container for the block",
|
||||
FAILED_TO_ALLOCATE_CONTAINER);
|
||||
}
|
||||
createContainer = true;
|
||||
} else {
|
||||
candidates = openContainers.entrySet().parallelStream()
|
||||
.filter(e -> (e.getValue() + size < containerSize))
|
||||
.map(e -> e.getKey())
|
||||
.collect(Collectors.toList());
|
||||
createContainer = false;
|
||||
}
|
||||
|
||||
if (candidates.size() == 0) {
|
||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
|||
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
||||
.LengthInputStream;
|
||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||
import org.apache.hadoop.ksm.protocolPB
|
||||
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
||||
|
@ -53,6 +55,7 @@ import org.apache.hadoop.ozone.web.response.*;
|
|||
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||
import org.apache.hadoop.scm.storage.ChunkInputStream;
|
||||
import org.apache.hadoop.scm.storage.ChunkOutputStream;
|
||||
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -279,15 +282,29 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
@Override
|
||||
public OutputStream newKeyWriter(KeyArgs args) throws IOException,
|
||||
OzoneException {
|
||||
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
|
||||
.setVolumeName(args.getVolumeName())
|
||||
.setBucketName(args.getBucketName())
|
||||
.setKeyName(args.getKeyName())
|
||||
.setDataSize(args.getSize())
|
||||
.build();
|
||||
// contact KSM to allocate a block for key.
|
||||
String containerKey = buildContainerKey(args.getVolumeName(),
|
||||
args.getBucketName(), args.getKeyName());
|
||||
KeyInfo key = new KeyInfo();
|
||||
key.setKeyName(args.getKeyName());
|
||||
key.setCreatedOn(dateToString(new Date()));
|
||||
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
|
||||
return new ChunkOutputStream(containerKey, key.getKeyName(),
|
||||
xceiverClientManager, xceiverClient, args.getRequestID(),
|
||||
chunkSize);
|
||||
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
|
||||
// TODO the following createContainer and key writes may fail, in which
|
||||
// case we should revert the above allocateKey to KSM.
|
||||
String containerName = keyInfo.getContainerName();
|
||||
XceiverClientSpi xceiverClient = getContainer(containerName);
|
||||
if (keyInfo.getShouldCreateContainer()) {
|
||||
LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
|
||||
args.getVolumeName(), args.getBucketName(), args.getKeyName());
|
||||
ContainerProtocolCalls.createContainer(
|
||||
xceiverClient, args.getRequestID());
|
||||
}
|
||||
// establish a connection to the container to write the key
|
||||
return new ChunkOutputStream(containerKey, args.getKeyName(),
|
||||
xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -334,6 +351,13 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
throw new UnsupportedOperationException("listKeys not implemented");
|
||||
}
|
||||
|
||||
private XceiverClientSpi getContainer(String containerName)
|
||||
throws IOException {
|
||||
Pipeline pipeline =
|
||||
storageContainerLocationClient.getContainer(containerName);
|
||||
return xceiverClientManager.acquireClient(pipeline);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline}
|
||||
* of nodes capable of serving container protocol operations.
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm;
|
|||
|
||||
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
|
@ -25,12 +26,14 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
|
|||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -38,6 +41,8 @@ import org.junit.Test;
|
|||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
|
@ -65,7 +70,8 @@ public class TestKeySpaceManager {
|
|||
cluster = new MiniOzoneCluster.Builder(conf)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
|
||||
userArgs = new UserArgs(null, null, null, null, null, null);
|
||||
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
|
||||
null, null, null, null);
|
||||
ksmMetrics = cluster.getKeySpaceManager().getMetrics();
|
||||
}
|
||||
|
||||
|
@ -190,4 +196,33 @@ public class TestKeySpaceManager {
|
|||
Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails());
|
||||
Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetKeyWriter() throws IOException, OzoneException {
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||
String keyName = "key" + RandomStringUtils.randomNumeric(5);
|
||||
Assert.assertEquals(0, ksmMetrics.getNumKeyBlockAllocates());
|
||||
|
||||
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
|
||||
createVolumeArgs.setUserName(userName);
|
||||
createVolumeArgs.setAdminName(adminName);
|
||||
storageHandler.createVolume(createVolumeArgs);
|
||||
|
||||
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
|
||||
bucketArgs.setAddAcls(new LinkedList<>());
|
||||
bucketArgs.setRemoveAcls(new LinkedList<>());
|
||||
bucketArgs.setStorageType(StorageType.DISK);
|
||||
storageHandler.createBucket(bucketArgs);
|
||||
|
||||
String dataString = RandomStringUtils.randomAscii(100);
|
||||
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
|
||||
keyArgs.setSize(4096);
|
||||
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
|
||||
stream.write(dataString.getBytes());
|
||||
}
|
||||
Assert.assertEquals(1, ksmMetrics.getNumKeyBlockAllocates());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue