HDFS-11775. Ozone: KSM: add createBucket. Contributed by Nandakumar Vadivelu.
This commit is contained in:
parent
e9d09c209e
commit
ca70300eea
|
@ -0,0 +1,227 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ksm.helpers;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.BucketInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A class that encapsulates Bucket Arguments.
|
||||||
|
*/
|
||||||
|
public final class KsmBucketArgs {
|
||||||
|
/**
|
||||||
|
* Name of the volume in which the bucket belongs to.
|
||||||
|
*/
|
||||||
|
private final String volumeName;
|
||||||
|
/**
|
||||||
|
* Name of the bucket.
|
||||||
|
*/
|
||||||
|
private final String bucketName;
|
||||||
|
/**
|
||||||
|
* ACL's that are to be added for the bucket.
|
||||||
|
*/
|
||||||
|
private List<OzoneAclInfo> addAcls;
|
||||||
|
/**
|
||||||
|
* ACL's that are to be removed from the bucket.
|
||||||
|
*/
|
||||||
|
private List<OzoneAclInfo> removeAcls;
|
||||||
|
/**
|
||||||
|
* Bucket Version flag.
|
||||||
|
*/
|
||||||
|
private boolean isVersionEnabled;
|
||||||
|
/**
|
||||||
|
* Type of storage to be used for this bucket.
|
||||||
|
* [RAM_DISK, SSD, DISK, ARCHIVE]
|
||||||
|
*/
|
||||||
|
private StorageType storageType;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private constructor, constructed via builder.
|
||||||
|
* @param volumeName - Volume name.
|
||||||
|
* @param bucketName - Bucket name.
|
||||||
|
* @param addAcls - ACL's to be added.
|
||||||
|
* @param removeAcls - ACL's to be removed.
|
||||||
|
* @param isVersionEnabled - Bucket version flag.
|
||||||
|
* @param storageType - Storage type to be used.
|
||||||
|
*/
|
||||||
|
private KsmBucketArgs(String volumeName, String bucketName,
|
||||||
|
List<OzoneAclInfo> addAcls, List<OzoneAclInfo> removeAcls,
|
||||||
|
boolean isVersionEnabled, StorageType storageType) {
|
||||||
|
this.volumeName = volumeName;
|
||||||
|
this.bucketName = bucketName;
|
||||||
|
this.addAcls = addAcls;
|
||||||
|
this.removeAcls = removeAcls;
|
||||||
|
this.isVersionEnabled = isVersionEnabled;
|
||||||
|
this.storageType = storageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Volume Name.
|
||||||
|
* @return String.
|
||||||
|
*/
|
||||||
|
public String getVolumeName() {
|
||||||
|
return volumeName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the Bucket Name.
|
||||||
|
* @return String
|
||||||
|
*/
|
||||||
|
public String getBucketName() {
|
||||||
|
return bucketName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the ACL's that are to be added.
|
||||||
|
* @return List<OzoneAcl>
|
||||||
|
*/
|
||||||
|
public List<OzoneAclInfo> getAddAcls() {
|
||||||
|
return addAcls;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the ACL's that are to be removed.
|
||||||
|
* @return List<OzoneAcl>
|
||||||
|
*/
|
||||||
|
public List<OzoneAclInfo> getRemoveAcls() {
|
||||||
|
return removeAcls;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if bucket version is enabled, else false.
|
||||||
|
* @return isVersionEnabled
|
||||||
|
*/
|
||||||
|
public boolean getIsVersionEnabled() {
|
||||||
|
return isVersionEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the type of storage to be used.
|
||||||
|
* @return StorageType
|
||||||
|
*/
|
||||||
|
public StorageType getStorageType() {
|
||||||
|
return storageType;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns new builder class that builds a KsmBucketArgs.
|
||||||
|
*
|
||||||
|
* @return Builder
|
||||||
|
*/
|
||||||
|
public static Builder newBuilder() {
|
||||||
|
return new Builder();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder for KsmBucketArgs.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private String volumeName;
|
||||||
|
private String bucketName;
|
||||||
|
private List<OzoneAclInfo> addAcls;
|
||||||
|
private List<OzoneAclInfo> removeAcls;
|
||||||
|
private boolean isVersionEnabled;
|
||||||
|
private StorageType storageType;
|
||||||
|
|
||||||
|
Builder() {
|
||||||
|
addAcls = new LinkedList<>();
|
||||||
|
removeAcls = new LinkedList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setVolumeName(String volume) {
|
||||||
|
this.volumeName = volume;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setBucketName(String bucket) {
|
||||||
|
this.bucketName = bucket;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder addAddAcl(OzoneAclInfo acl) {
|
||||||
|
this.addAcls.add(acl);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder addRemoveAcl(OzoneAclInfo acl) {
|
||||||
|
this.removeAcls.add(acl);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setIsVersionEnabled(boolean versionFlag) {
|
||||||
|
this.isVersionEnabled = versionFlag;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setStorageType(StorageType storage) {
|
||||||
|
this.storageType = storage;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the KsmBucketArgs.
|
||||||
|
* @return instance of KsmBucketArgs.
|
||||||
|
*/
|
||||||
|
public KsmBucketArgs build() {
|
||||||
|
Preconditions.checkNotNull(volumeName);
|
||||||
|
Preconditions.checkNotNull(bucketName);
|
||||||
|
Preconditions.checkNotNull(isVersionEnabled);
|
||||||
|
return new KsmBucketArgs(volumeName, bucketName, addAcls, removeAcls,
|
||||||
|
isVersionEnabled, storageType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates BucketInfo protobuf from KsmBucketArgs.
|
||||||
|
*/
|
||||||
|
public BucketInfo getProtobuf() {
|
||||||
|
return BucketInfo.newBuilder()
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setBucketName(bucketName)
|
||||||
|
.addAllAddAcls(addAcls)
|
||||||
|
.addAllRemoveAcls(removeAcls)
|
||||||
|
.setIsVersionEnabled(isVersionEnabled)
|
||||||
|
.setStorageType(PBHelperClient.convertStorageType(storageType))
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parses BucketInfo protobuf and creates KsmBucketArgs.
|
||||||
|
* @param bucketInfo
|
||||||
|
* @return instance of KsmBucketArgs
|
||||||
|
*/
|
||||||
|
public static KsmBucketArgs getFromProtobuf(BucketInfo bucketInfo) {
|
||||||
|
return new KsmBucketArgs(
|
||||||
|
bucketInfo.getVolumeName(),
|
||||||
|
bucketInfo.getBucketName(),
|
||||||
|
bucketInfo.getAddAclsList(),
|
||||||
|
bucketInfo.getRemoveAclsList(),
|
||||||
|
bucketInfo.getIsVersionEnabled(),
|
||||||
|
PBHelperClient.convertStorageType(
|
||||||
|
bucketInfo.getStorageType()));
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ksm.protocol;
|
package org.apache.hadoop.ksm.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -24,7 +25,7 @@ import java.util.List;
|
||||||
/**
|
/**
|
||||||
* Protocol to talk to KSM.
|
* Protocol to talk to KSM.
|
||||||
*/
|
*/
|
||||||
public interface KeyspaceManagerProtocol {
|
public interface KeySpaceManagerProtocol {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a volume.
|
* Creates a volume.
|
||||||
|
@ -94,4 +95,12 @@ public interface KeyspaceManagerProtocol {
|
||||||
*/
|
*/
|
||||||
List<KsmVolumeArgs> listAllVolumes(String prefix, String
|
List<KsmVolumeArgs> listAllVolumes(String prefix, String
|
||||||
prevKey, long maxKeys) throws IOException;
|
prevKey, long maxKeys) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a bucket.
|
||||||
|
* @param args - Arguments to create Bucket.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
void createBucket(KsmBucketArgs args) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
|
@ -22,8 +22,15 @@ import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.BucketInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateBucketRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateBucketResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -38,12 +45,12 @@ import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The client side implementation of KeyspaceManagerProtocol.
|
* The client side implementation of KeySpaceManagerProtocol.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
||||||
implements KeyspaceManagerProtocol, ProtocolTranslator, Closeable {
|
implements KeySpaceManagerProtocol, ProtocolTranslator, Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* RpcController is not used and hence is set to null.
|
* RpcController is not used and hence is set to null.
|
||||||
|
@ -199,6 +206,32 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a bucket.
|
||||||
|
*
|
||||||
|
* @param args - Arguments to create Bucket.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void createBucket(KsmBucketArgs args) throws IOException {
|
||||||
|
CreateBucketRequest.Builder req =
|
||||||
|
CreateBucketRequest.newBuilder();
|
||||||
|
BucketInfo bucketInfo = args.getProtobuf();
|
||||||
|
req.setBucketInfo(bucketInfo);
|
||||||
|
|
||||||
|
final CreateBucketResponse resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.createBucket(NULL_RPC_CONTROLLER,
|
||||||
|
req.build());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
if (resp.getStatus() != Status.OK) {
|
||||||
|
throw new IOException("Bucket creation failed, error: "
|
||||||
|
+ resp.getStatus());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the proxy object underlying this protocol translator.
|
* Return the proxy object underlying this protocol translator.
|
||||||
*
|
*
|
||||||
|
|
|
@ -19,15 +19,16 @@ package org.apache.hadoop.ksm.protocolPB;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.ipc.ProtocolInfo;
|
import org.apache.hadoop.ipc.ProtocolInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyspaceManagerService;
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.KeySpaceManagerService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protocol used to communicate with KSM.
|
* Protocol used to communicate with KSM.
|
||||||
*/
|
*/
|
||||||
@ProtocolInfo(protocolName =
|
@ProtocolInfo(protocolName =
|
||||||
"org.apache.hadoop.ozone.protocol.KeyspaceManagerProtocol",
|
"org.apache.hadoop.ozone.protocol.KeySpaceManagerProtocol",
|
||||||
protocolVersion = 1)
|
protocolVersion = 1)
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public interface KeySpaceManagerProtocolPB
|
public interface KeySpaceManagerProtocolPB
|
||||||
extends KeyspaceManagerService.BlockingInterface {
|
extends KeySpaceManagerService.BlockingInterface {
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ Ozone key space manager. Ozone KSM manages the namespace for ozone.
|
||||||
This is similar to Namenode for Ozone.
|
This is similar to Namenode for Ozone.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import "hdfs.proto";
|
||||||
import "Ozone.proto";
|
import "Ozone.proto";
|
||||||
|
|
||||||
enum Status {
|
enum Status {
|
||||||
|
@ -44,8 +45,11 @@ enum Status {
|
||||||
VOLUME_ALREADY_EXISTS = 5;
|
VOLUME_ALREADY_EXISTS = 5;
|
||||||
USER_NOT_FOUND = 6;
|
USER_NOT_FOUND = 6;
|
||||||
USER_TOO_MANY_VOLUMES = 7;
|
USER_TOO_MANY_VOLUMES = 7;
|
||||||
ACCESS_DENIED = 8;
|
BUCKET_NOT_FOUND = 8;
|
||||||
INTERNAL_ERROR = 9;
|
BUCKET_NOT_EMPTY = 9;
|
||||||
|
BUCKET_ALREADY_EXISTS = 10;
|
||||||
|
ACCESS_DENIED = 11;
|
||||||
|
INTERNAL_ERROR = 12;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -154,10 +158,42 @@ message ListVolumeResponse {
|
||||||
repeated VolumeInfo volumeInfo = 2;
|
repeated VolumeInfo volumeInfo = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message BucketInfo {
|
||||||
|
required string volumeName = 1;
|
||||||
|
required string bucketName = 2;
|
||||||
|
repeated OzoneAclInfo addAcls = 3;
|
||||||
|
repeated OzoneAclInfo removeAcls = 4;
|
||||||
|
required bool isVersionEnabled = 5 [default = false];
|
||||||
|
optional StorageTypeProto storageType = 6 [default = DISK];
|
||||||
|
}
|
||||||
|
|
||||||
|
message OzoneAclInfo {
|
||||||
|
enum OzoneAclType {
|
||||||
|
USER = 1;
|
||||||
|
GROUP = 2;
|
||||||
|
WORLD = 3;
|
||||||
|
}
|
||||||
|
enum OzoneAclRights {
|
||||||
|
READ = 1;
|
||||||
|
WRITE = 2;
|
||||||
|
READ_WRITE = 3;
|
||||||
|
}
|
||||||
|
required OzoneAclType type = 1;
|
||||||
|
required string name = 2;
|
||||||
|
required OzoneAclRights rights = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateBucketRequest {
|
||||||
|
required BucketInfo bucketInfo = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateBucketResponse {
|
||||||
|
required Status status = 1;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
The KSM service that takes care of Ozone namespace.
|
The KSM service that takes care of Ozone namespace.
|
||||||
*/
|
*/
|
||||||
service KeyspaceManagerService {
|
service KeySpaceManagerService {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Creates a Volume.
|
Creates a Volume.
|
||||||
|
@ -193,4 +229,10 @@ service KeyspaceManagerService {
|
||||||
*/
|
*/
|
||||||
rpc listVolumes(ListVolumeRequest)
|
rpc listVolumes(ListVolumeRequest)
|
||||||
returns (ListVolumeResponse);
|
returns (ListVolumeResponse);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Creates a Bucket.
|
||||||
|
*/
|
||||||
|
rpc createBucket(CreateBucketRequest)
|
||||||
|
returns(CreateBucketResponse);
|
||||||
}
|
}
|
|
@ -93,6 +93,11 @@ public final class OzoneConsts {
|
||||||
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
|
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
|
||||||
public static final String OZONE_HANDLER_LOCAL = "local";
|
public static final String OZONE_HANDLER_LOCAL = "local";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ozone metadata key delimiter.
|
||||||
|
*/
|
||||||
|
public static final String DB_KEY_DELIMITER = "/";
|
||||||
|
|
||||||
private OzoneConsts() {
|
private OzoneConsts() {
|
||||||
// Never Constructed
|
// Never Constructed
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BucketManager handles all the bucket level operations.
|
||||||
|
*/
|
||||||
|
public interface BucketManager {
|
||||||
|
/**
|
||||||
|
* Creates a bucket.
|
||||||
|
* @param args - KsmBucketArgs for creating bucket.
|
||||||
|
*/
|
||||||
|
void createBucket(KsmBucketArgs args) throws KSMException;
|
||||||
|
}
|
|
@ -0,0 +1,111 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
import org.iq80.leveldb.DBException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.DB_KEY_DELIMITER;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KSM bucket manager.
|
||||||
|
*/
|
||||||
|
public class BucketManagerImpl implements BucketManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(BucketManagerImpl.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MetadataManager is used for accessing KSM MetadataDB and ReadWriteLock.
|
||||||
|
*/
|
||||||
|
private final MetadataManager metadataManager;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs BucketManager.
|
||||||
|
* @param metadataManager
|
||||||
|
*/
|
||||||
|
public BucketManagerImpl(MetadataManager metadataManager){
|
||||||
|
this.metadataManager = metadataManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* MetadataDB is maintained in MetadataManager and shared between
|
||||||
|
* BucketManager and VolumeManager. (and also by KeyManager)
|
||||||
|
*
|
||||||
|
* BucketManager uses MetadataDB to store bucket level information.
|
||||||
|
*
|
||||||
|
* Keys used in BucketManager for storing data into MetadataDB
|
||||||
|
* for BucketInfo:
|
||||||
|
* {volume/bucket} -> bucketInfo
|
||||||
|
*
|
||||||
|
* Work flow of create bucket:
|
||||||
|
*
|
||||||
|
* -> Check if the Volume exists in metadataDB, if not throw
|
||||||
|
* VolumeNotFoundException.
|
||||||
|
* -> Else check if the Bucket exists in metadataDB, if so throw
|
||||||
|
* BucketExistException
|
||||||
|
* -> Else update MetadataDB with VolumeInfo.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a bucket.
|
||||||
|
* @param args - KsmBucketArgs.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void createBucket(KsmBucketArgs args) throws KSMException {
|
||||||
|
Preconditions.checkNotNull(args);
|
||||||
|
metadataManager.writeLock().lock();
|
||||||
|
String volumeNameString = args.getVolumeName();
|
||||||
|
String bucketNameString = args.getBucketName();
|
||||||
|
try {
|
||||||
|
//bucket key: {volume/bucket}
|
||||||
|
String bucketKeyString = volumeNameString +
|
||||||
|
DB_KEY_DELIMITER + bucketNameString;
|
||||||
|
|
||||||
|
byte[] volumeName = DFSUtil.string2Bytes(volumeNameString);
|
||||||
|
byte[] bucketKey = DFSUtil.string2Bytes(bucketKeyString);
|
||||||
|
|
||||||
|
//Check if the volume exists
|
||||||
|
if(metadataManager.get(volumeName) == null) {
|
||||||
|
LOG.error("volume: {} not found ", volumeNameString);
|
||||||
|
throw new KSMException("Volume doesn't exist",
|
||||||
|
KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||||
|
}
|
||||||
|
//Check if bucket already exists
|
||||||
|
if(metadataManager.get(bucketKey) != null) {
|
||||||
|
LOG.error("bucket: {} already exists ", bucketNameString);
|
||||||
|
throw new KSMException("Bucket already exist",
|
||||||
|
KSMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
|
||||||
|
}
|
||||||
|
metadataManager.put(bucketKey, args.getProtobuf().toByteArray());
|
||||||
|
|
||||||
|
LOG.info("created bucket: {} in volume: {}", bucketNameString,
|
||||||
|
volumeNameString);
|
||||||
|
} catch (DBException ex) {
|
||||||
|
LOG.error("Bucket creation failed for bucket:{} in volume:{}",
|
||||||
|
volumeNameString, bucketNameString, ex);
|
||||||
|
throw new KSMException(ex.getMessage(),
|
||||||
|
KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
|
||||||
|
} finally {
|
||||||
|
metadataManager.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,9 +29,11 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
public class KSMMetrics {
|
public class KSMMetrics {
|
||||||
// KSM op metrics
|
// KSM op metrics
|
||||||
private @Metric MutableCounterLong numVolumeCreates;
|
private @Metric MutableCounterLong numVolumeCreates;
|
||||||
|
private @Metric MutableCounterLong numBucketCreates;
|
||||||
|
|
||||||
// Failure Metrics
|
// Failure Metrics
|
||||||
private @Metric MutableCounterLong numVolumeCreateFails;
|
private @Metric MutableCounterLong numVolumeCreateFails;
|
||||||
|
private @Metric MutableCounterLong numBucketCreateFails;
|
||||||
|
|
||||||
public KSMMetrics() {
|
public KSMMetrics() {
|
||||||
}
|
}
|
||||||
|
@ -47,17 +49,36 @@ public class KSMMetrics {
|
||||||
numVolumeCreates.incr();
|
numVolumeCreates.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBucketCreates() {
|
||||||
|
numBucketCreates.incr();
|
||||||
|
}
|
||||||
|
|
||||||
public void incNumVolumeCreateFails() {
|
public void incNumVolumeCreateFails() {
|
||||||
numVolumeCreates.incr();
|
numVolumeCreates.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumBucketCreateFails() {
|
||||||
|
numBucketCreateFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreates() {
|
public long getNumVolumeCreates() {
|
||||||
return numVolumeCreates.value();
|
return numVolumeCreates.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBucketCreates() {
|
||||||
|
return numBucketCreates.value();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreateFails() {
|
public long getNumVolumeCreateFails() {
|
||||||
return numVolumeCreateFails.value();
|
return numVolumeCreateFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumBucketCreateFails() {
|
||||||
|
return numBucketCreateFails.value();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,13 +23,14 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.protocolPB
|
import org.apache.hadoop.ozone.protocolPB
|
||||||
.KeyspaceManagerProtocolServerSideTranslatorPB;
|
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -46,7 +47,7 @@ import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
.OZONE_KSM_HANDLER_COUNT_KEY;
|
.OZONE_KSM_HANDLER_COUNT_KEY;
|
||||||
import static org.apache.hadoop.ozone.protocol.proto
|
import static org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.KeyspaceManagerService
|
.KeySpaceManagerProtocolProtos.KeySpaceManagerService
|
||||||
.newReflectiveBlockingService;
|
.newReflectiveBlockingService;
|
||||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
|
|
||||||
|
@ -54,13 +55,15 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||||
* Ozone Keyspace manager is the metadata manager of ozone.
|
* Ozone Keyspace manager is the metadata manager of ozone.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
|
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
|
||||||
public class KeySpaceManager implements KeyspaceManagerProtocol {
|
public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(KeySpaceManager.class);
|
LoggerFactory.getLogger(KeySpaceManager.class);
|
||||||
|
|
||||||
private final RPC.Server ksmRpcServer;
|
private final RPC.Server ksmRpcServer;
|
||||||
private final InetSocketAddress ksmRpcAddress;
|
private final InetSocketAddress ksmRpcAddress;
|
||||||
|
private final MetadataManager metadataManager;
|
||||||
private final VolumeManager volumeManager;
|
private final VolumeManager volumeManager;
|
||||||
|
private final BucketManager bucketManager;
|
||||||
private final KSMMetrics metrics;
|
private final KSMMetrics metrics;
|
||||||
|
|
||||||
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
||||||
|
@ -71,7 +74,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
|
||||||
ProtobufRpcEngine.class);
|
ProtobufRpcEngine.class);
|
||||||
|
|
||||||
BlockingService ksmService = newReflectiveBlockingService(
|
BlockingService ksmService = newReflectiveBlockingService(
|
||||||
new KeyspaceManagerProtocolServerSideTranslatorPB(this));
|
new KeySpaceManagerProtocolServerSideTranslatorPB(this));
|
||||||
final InetSocketAddress ksmNodeRpcAddr = OzoneClientUtils.
|
final InetSocketAddress ksmNodeRpcAddr = OzoneClientUtils.
|
||||||
getKsmAddress(conf);
|
getKsmAddress(conf);
|
||||||
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
|
ksmRpcServer = startRpcServer(conf, ksmNodeRpcAddr,
|
||||||
|
@ -79,7 +82,9 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
|
||||||
handlerCount);
|
handlerCount);
|
||||||
ksmRpcAddress = updateListenAddress(conf,
|
ksmRpcAddress = updateListenAddress(conf,
|
||||||
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
|
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
|
||||||
volumeManager = new VolumeManagerImpl(this, conf);
|
metadataManager = new MetadataManagerImpl(conf);
|
||||||
|
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
||||||
|
bucketManager = new BucketManagerImpl(metadataManager);
|
||||||
metrics = KSMMetrics.create();
|
metrics = KSMMetrics.create();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,7 +190,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
|
||||||
public void start() {
|
public void start() {
|
||||||
LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
|
LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
|
||||||
ksmRpcAddress));
|
ksmRpcAddress));
|
||||||
volumeManager.start();
|
metadataManager.start();
|
||||||
ksmRpcServer.start();
|
ksmRpcServer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +200,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
|
||||||
public void stop() {
|
public void stop() {
|
||||||
try {
|
try {
|
||||||
ksmRpcServer.stop();
|
ksmRpcServer.stop();
|
||||||
volumeManager.stop();
|
metadataManager.stop();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.info("Key Space Manager stop failed.", e);
|
LOG.info("Key Space Manager stop failed.", e);
|
||||||
}
|
}
|
||||||
|
@ -221,8 +226,13 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void createVolume(KsmVolumeArgs args) throws IOException {
|
public void createVolume(KsmVolumeArgs args) throws IOException {
|
||||||
metrics.incNumVolumeCreates();
|
try {
|
||||||
volumeManager.createVolume(args);
|
metrics.incNumVolumeCreates();
|
||||||
|
volumeManager.createVolume(args);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
metrics.incNumVolumeCreateFails();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -317,4 +327,21 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
|
||||||
maxKeys) throws IOException {
|
maxKeys) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a bucket.
|
||||||
|
*
|
||||||
|
* @param args - Arguments to create Bucket.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void createBucket(KsmBucketArgs args) throws IOException {
|
||||||
|
try {
|
||||||
|
metrics.incNumBucketCreates();
|
||||||
|
bucketManager.createBucket(args);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
metrics.incNumBucketCreateFails();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KSM metadata manager interface.
|
||||||
|
*/
|
||||||
|
public interface MetadataManager {
|
||||||
|
/**
|
||||||
|
* Start metadata manager.
|
||||||
|
*/
|
||||||
|
void start();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop metadata manager.
|
||||||
|
*/
|
||||||
|
void stop() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the read lock used on Metadata DB.
|
||||||
|
* @return readLock
|
||||||
|
*/
|
||||||
|
Lock readLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the write lock used on Metadata DB.
|
||||||
|
* @return writeLock
|
||||||
|
*/
|
||||||
|
Lock writeLock();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value associated with this key.
|
||||||
|
* @param key - key
|
||||||
|
* @return value
|
||||||
|
*/
|
||||||
|
byte[] get(byte[] key);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Puts a Key into Metadata DB.
|
||||||
|
* @param key - key
|
||||||
|
* @param value - value
|
||||||
|
*/
|
||||||
|
void put(byte[] key, byte[] value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs a batch Put to Metadata DB.
|
||||||
|
* Can be used to do multiple puts atomically.
|
||||||
|
* @param list - list of Map.Entry
|
||||||
|
*/
|
||||||
|
void batchPut(List<Map.Entry<byte[], byte[]>> list) throws IOException;
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,133 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
|
import org.apache.hadoop.utils.LevelDBStore;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
|
import org.iq80.leveldb.WriteBatch;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
|
.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
|
||||||
|
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
|
||||||
|
.OZONE_KSM_DB_CACHE_SIZE_MB;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KSM metadata manager interface.
|
||||||
|
*/
|
||||||
|
public class MetadataManagerImpl implements MetadataManager {
|
||||||
|
|
||||||
|
private final LevelDBStore store;
|
||||||
|
private final ReadWriteLock lock;
|
||||||
|
|
||||||
|
|
||||||
|
public MetadataManagerImpl(OzoneConfiguration conf) throws IOException {
|
||||||
|
File metaDir = OzoneUtils.getScmMetadirPath(conf);
|
||||||
|
final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
|
||||||
|
OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
|
||||||
|
Options options = new Options();
|
||||||
|
options.cacheSize(cacheSize * OzoneConsts.MB);
|
||||||
|
File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
|
||||||
|
this.store = new LevelDBStore(ksmDBFile, options);
|
||||||
|
this.lock = new ReentrantReadWriteLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start metadata manager.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop metadata manager.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void stop() throws IOException {
|
||||||
|
if (store != null) {
|
||||||
|
store.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the read lock used on Metadata DB.
|
||||||
|
* @return readLock
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Lock readLock() {
|
||||||
|
return lock.readLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the write lock used on Metadata DB.
|
||||||
|
* @return writeLock
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Lock writeLock() {
|
||||||
|
return lock.writeLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the value associated with this key.
|
||||||
|
* @param key - key
|
||||||
|
* @return value
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public byte[] get(byte[] key) {
|
||||||
|
return store.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Puts a Key into Metadata DB.
|
||||||
|
* @param key - key
|
||||||
|
* @param value - value
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void put(byte[] key, byte[] value) {
|
||||||
|
store.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs a batch Put to Metadata DB.
|
||||||
|
* Can be used to do multiple puts atomically.
|
||||||
|
* @param list - list of Map.Entry
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void batchPut(List<Map.Entry<byte[], byte[]>> list)
|
||||||
|
throws IOException {
|
||||||
|
WriteBatch batch = store.createWriteBatch();
|
||||||
|
list.forEach(entry -> batch.put(entry.getKey(), entry.getValue()));
|
||||||
|
try {
|
||||||
|
store.commitWriteBatch(batch);
|
||||||
|
} finally {
|
||||||
|
store.closeWriteBatch(batch);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -24,15 +24,6 @@ import java.io.IOException;
|
||||||
* KSM volume manager interface.
|
* KSM volume manager interface.
|
||||||
*/
|
*/
|
||||||
public interface VolumeManager {
|
public interface VolumeManager {
|
||||||
/**
|
|
||||||
* Start volume manager.
|
|
||||||
*/
|
|
||||||
void start();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stop volume manager.
|
|
||||||
*/
|
|
||||||
void stop() throws IOException;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new volume.
|
* Create a new volume.
|
||||||
|
|
|
@ -20,32 +20,21 @@ import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.VolumeList;
|
.KeySpaceManagerProtocolProtos.VolumeList;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.VolumeInfo;
|
.KeySpaceManagerProtocolProtos.VolumeInfo;
|
||||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
|
||||||
import org.apache.hadoop.utils.LevelDBStore;
|
|
||||||
import org.iq80.leveldb.DBException;
|
import org.iq80.leveldb.DBException;
|
||||||
import org.iq80.leveldb.Options;
|
|
||||||
import org.iq80.leveldb.WriteBatch;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.AbstractMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.Map;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
|
|
||||||
import static org.apache.hadoop.ozone.ksm
|
|
||||||
.KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
|
|
||||||
import static org.apache.hadoop.ozone.ksm
|
|
||||||
.KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_MB;
|
|
||||||
import static org.apache.hadoop.ozone.ksm
|
import static org.apache.hadoop.ozone.ksm
|
||||||
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
|
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
|
||||||
import static org.apache.hadoop.ozone.ksm
|
import static org.apache.hadoop.ozone.ksm
|
||||||
|
@ -60,9 +49,7 @@ public class VolumeManagerImpl implements VolumeManager {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(VolumeManagerImpl.class);
|
LoggerFactory.getLogger(VolumeManagerImpl.class);
|
||||||
|
|
||||||
private final KeySpaceManager ksm;
|
private final MetadataManager metadataManager;
|
||||||
private final LevelDBStore store;
|
|
||||||
private final ReadWriteLock lock;
|
|
||||||
private final int maxUserVolumeCount;
|
private final int maxUserVolumeCount;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -70,30 +57,13 @@ public class VolumeManagerImpl implements VolumeManager {
|
||||||
* @param conf - Ozone configuration.
|
* @param conf - Ozone configuration.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public VolumeManagerImpl(KeySpaceManager ksm, OzoneConfiguration conf)
|
public VolumeManagerImpl(MetadataManager metadataManager,
|
||||||
throws IOException {
|
OzoneConfiguration conf) throws IOException {
|
||||||
File metaDir = OzoneUtils.getScmMetadirPath(conf);
|
this.metadataManager = metadataManager;
|
||||||
final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
|
|
||||||
OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
|
|
||||||
Options options = new Options();
|
|
||||||
options.cacheSize(cacheSize * OzoneConsts.MB);
|
|
||||||
File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
|
|
||||||
this.ksm = ksm;
|
|
||||||
this.store = new LevelDBStore(ksmDBFile, options);
|
|
||||||
lock = new ReentrantReadWriteLock();
|
|
||||||
this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
|
this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
|
||||||
OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
|
OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void start() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() throws IOException {
|
|
||||||
store.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a volume.
|
* Creates a volume.
|
||||||
* @param args - KsmVolumeArgs.
|
* @param args - KsmVolumeArgs.
|
||||||
|
@ -101,10 +71,11 @@ public class VolumeManagerImpl implements VolumeManager {
|
||||||
@Override
|
@Override
|
||||||
public void createVolume(KsmVolumeArgs args) throws IOException {
|
public void createVolume(KsmVolumeArgs args) throws IOException {
|
||||||
Preconditions.checkNotNull(args);
|
Preconditions.checkNotNull(args);
|
||||||
lock.writeLock().lock();
|
metadataManager.writeLock().lock();
|
||||||
WriteBatch batch = store.createWriteBatch();
|
List<Map.Entry<byte[], byte[]>> batch = new LinkedList<>();
|
||||||
try {
|
try {
|
||||||
byte[] volumeName = store.get(DFSUtil.string2Bytes(args.getVolume()));
|
byte[] volumeName = metadataManager.
|
||||||
|
get(DFSUtil.string2Bytes(args.getVolume()));
|
||||||
|
|
||||||
// Check of the volume already exists
|
// Check of the volume already exists
|
||||||
if(volumeName != null) {
|
if(volumeName != null) {
|
||||||
|
@ -114,7 +85,8 @@ public class VolumeManagerImpl implements VolumeManager {
|
||||||
|
|
||||||
// Next count the number of volumes for the user
|
// Next count the number of volumes for the user
|
||||||
String dbUserName = "$" + args.getOwnerName();
|
String dbUserName = "$" + args.getOwnerName();
|
||||||
byte[] volumeList = store.get(DFSUtil.string2Bytes(dbUserName));
|
byte[] volumeList = metadataManager
|
||||||
|
.get(DFSUtil.string2Bytes(dbUserName));
|
||||||
List prevVolList;
|
List prevVolList;
|
||||||
if (volumeList != null) {
|
if (volumeList != null) {
|
||||||
VolumeList vlist = VolumeList.parseFrom(volumeList);
|
VolumeList vlist = VolumeList.parseFrom(volumeList);
|
||||||
|
@ -128,26 +100,25 @@ public class VolumeManagerImpl implements VolumeManager {
|
||||||
throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
|
throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit the volume information to leveldb
|
// Commit the volume information to metadataManager
|
||||||
VolumeInfo volumeInfo = args.getProtobuf();
|
VolumeInfo volumeInfo = args.getProtobuf();
|
||||||
batch.put(DFSUtil.string2Bytes(args.getVolume()),
|
batch.add(new AbstractMap.SimpleEntry<>(
|
||||||
volumeInfo.toByteArray());
|
DFSUtil.string2Bytes(args.getVolume()), volumeInfo.toByteArray()));
|
||||||
|
|
||||||
prevVolList.add(args.getVolume());
|
prevVolList.add(args.getVolume());
|
||||||
VolumeList newVolList = VolumeList.newBuilder()
|
VolumeList newVolList = VolumeList.newBuilder()
|
||||||
.addAllVolumeNames(prevVolList).build();
|
.addAllVolumeNames(prevVolList).build();
|
||||||
batch.put(DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray());
|
batch.add(new AbstractMap.SimpleEntry<>(
|
||||||
store.commitWriteBatch(batch);
|
DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray()));
|
||||||
|
metadataManager.batchPut(batch);
|
||||||
LOG.info("created volume:{} user:{}",
|
LOG.info("created volume:{} user:{}",
|
||||||
args.getVolume(), args.getOwnerName());
|
args.getVolume(), args.getOwnerName());
|
||||||
} catch (IOException | DBException ex) {
|
} catch (IOException | DBException ex) {
|
||||||
ksm.getMetrics().incNumVolumeCreateFails();
|
|
||||||
LOG.error("Volume creation failed for user:{} volname:{}",
|
LOG.error("Volume creation failed for user:{} volname:{}",
|
||||||
args.getOwnerName(), args.getVolume(), ex);
|
args.getOwnerName(), args.getVolume(), ex);
|
||||||
throw ex;
|
throw ex;
|
||||||
} finally {
|
} finally {
|
||||||
store.closeWriteBatch(batch);
|
metadataManager.writeLock().unlock();
|
||||||
lock.writeLock().unlock();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,6 +98,8 @@ public class KSMException extends IOException {
|
||||||
public enum ResultCodes {
|
public enum ResultCodes {
|
||||||
FAILED_TOO_MANY_USER_VOLUMES,
|
FAILED_TOO_MANY_USER_VOLUMES,
|
||||||
FAILED_VOLUME_ALREADY_EXISTS,
|
FAILED_VOLUME_ALREADY_EXISTS,
|
||||||
|
FAILED_VOLUME_NOT_FOUND,
|
||||||
|
FAILED_BUCKET_ALREADY_EXISTS,
|
||||||
FAILED_INTERNAL_ERROR
|
FAILED_INTERNAL_ERROR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.protocolPB;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.web.request.OzoneAcl;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclType;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.OzoneAclInfo.OzoneAclRights;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utilities for converting protobuf classes.
|
||||||
|
*/
|
||||||
|
public final class KSMPBHelper {
|
||||||
|
|
||||||
|
private KSMPBHelper() {
|
||||||
|
/** Hidden constructor */
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns protobuf's OzoneAclInfo of the current instance.
|
||||||
|
* @return OzoneAclInfo
|
||||||
|
*/
|
||||||
|
public static OzoneAclInfo convertOzoneAcl(OzoneAcl acl) {
|
||||||
|
OzoneAclInfo.OzoneAclType aclType;
|
||||||
|
switch(acl.getType()) {
|
||||||
|
case USER:
|
||||||
|
aclType = OzoneAclType.USER;
|
||||||
|
break;
|
||||||
|
case GROUP:
|
||||||
|
aclType = OzoneAclType.GROUP;
|
||||||
|
break;
|
||||||
|
case WORLD:
|
||||||
|
aclType = OzoneAclType.WORLD;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("ACL type is not recognized");
|
||||||
|
}
|
||||||
|
OzoneAclInfo.OzoneAclRights aclRights;
|
||||||
|
switch(acl.getRights()) {
|
||||||
|
case READ:
|
||||||
|
aclRights = OzoneAclRights.READ;
|
||||||
|
break;
|
||||||
|
case WRITE:
|
||||||
|
aclRights = OzoneAclRights.WRITE;
|
||||||
|
break;
|
||||||
|
case READ_WRITE:
|
||||||
|
aclRights = OzoneAclRights.READ_WRITE;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("ACL right is not recognized");
|
||||||
|
}
|
||||||
|
|
||||||
|
return OzoneAclInfo.newBuilder().setType(aclType)
|
||||||
|
.setName(acl.getName())
|
||||||
|
.setRights(aclRights)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,11 +18,16 @@ package org.apache.hadoop.ozone.protocolPB;
|
||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateBucketRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateBucketResponse;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -56,19 +61,19 @@ import java.io.IOException;
|
||||||
/**
|
/**
|
||||||
* This class is the server-side translator that forwards requests received on
|
* This class is the server-side translator that forwards requests received on
|
||||||
* {@link org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB} to the
|
* {@link org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB} to the
|
||||||
* KeyspaceManagerService server implementation.
|
* KeySpaceManagerService server implementation.
|
||||||
*/
|
*/
|
||||||
public class KeyspaceManagerProtocolServerSideTranslatorPB implements
|
public class KeySpaceManagerProtocolServerSideTranslatorPB implements
|
||||||
KeySpaceManagerProtocolPB {
|
KeySpaceManagerProtocolPB {
|
||||||
private final KeyspaceManagerProtocol impl;
|
private final KeySpaceManagerProtocol impl;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs an instance of the server handler.
|
* Constructs an instance of the server handler.
|
||||||
*
|
*
|
||||||
* @param impl KeySpaceManagerProtocolPB
|
* @param impl KeySpaceManagerProtocolPB
|
||||||
*/
|
*/
|
||||||
public KeyspaceManagerProtocolServerSideTranslatorPB(
|
public KeySpaceManagerProtocolServerSideTranslatorPB(
|
||||||
KeyspaceManagerProtocol impl) {
|
KeySpaceManagerProtocol impl) {
|
||||||
this.impl = impl;
|
this.impl = impl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,4 +136,28 @@ public class KeyspaceManagerProtocolServerSideTranslatorPB implements
|
||||||
throws ServiceException {
|
throws ServiceException {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CreateBucketResponse createBucket(
|
||||||
|
RpcController controller, CreateBucketRequest
|
||||||
|
request) throws ServiceException {
|
||||||
|
CreateBucketResponse.Builder resp =
|
||||||
|
CreateBucketResponse.newBuilder();
|
||||||
|
try {
|
||||||
|
impl.createBucket(KsmBucketArgs.getFromProtobuf(
|
||||||
|
request.getBucketInfo()));
|
||||||
|
resp.setStatus(Status.OK);
|
||||||
|
} catch (KSMException ksmEx) {
|
||||||
|
if (ksmEx.getResult() ==
|
||||||
|
ResultCodes.FAILED_VOLUME_NOT_FOUND) {
|
||||||
|
resp.setStatus(Status.VOLUME_NOT_FOUND);
|
||||||
|
} else if (ksmEx.getResult() ==
|
||||||
|
ResultCodes.FAILED_BUCKET_ALREADY_EXISTS) {
|
||||||
|
resp.setStatus(Status.BUCKET_ALREADY_EXISTS);
|
||||||
|
}
|
||||||
|
} catch(IOException ex) {
|
||||||
|
resp.setStatus(Status.INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -18,20 +18,29 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.web.storage;
|
package org.apache.hadoop.ozone.web.storage;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyResponseProto;
|
.ContainerProtos.ChunkInfo;
|
||||||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
|
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
||||||
|
.ContainerProtos.GetKeyResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.ozone.protocol.proto
|
||||||
|
.ContainerProtos.KeyData;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
||||||
|
.LengthInputStream;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.ksm.protocolPB
|
||||||
|
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
|
||||||
|
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
|
||||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.scm.XceiverClientManager;
|
import org.apache.hadoop.scm.XceiverClientManager;
|
||||||
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
import org.apache.hadoop.scm.protocol.LocatedContainer;
|
||||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.scm.protocolPB
|
||||||
|
.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||||
|
@ -59,7 +68,6 @@ import java.util.List;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
|
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
|
||||||
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
|
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
|
||||||
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link StorageHandler} implementation that distributes object storage
|
* A {@link StorageHandler} implementation that distributes object storage
|
||||||
|
@ -167,22 +175,38 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
@Override
|
@Override
|
||||||
public void createBucket(final BucketArgs args)
|
public void createBucket(final BucketArgs args)
|
||||||
throws IOException, OzoneException {
|
throws IOException, OzoneException {
|
||||||
String containerKey = buildContainerKey(args.getVolumeName(),
|
KsmBucketArgs.Builder builder = KsmBucketArgs.newBuilder();
|
||||||
args.getBucketName());
|
args.getAddAcls().forEach(acl ->
|
||||||
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
|
builder.addAddAcl(KSMPBHelper.convertOzoneAcl(acl)));
|
||||||
try {
|
args.getRemoveAcls().forEach(acl ->
|
||||||
BucketInfo bucket = new BucketInfo();
|
builder.addRemoveAcl(KSMPBHelper.convertOzoneAcl(acl)));
|
||||||
bucket.setVolumeName(args.getVolumeName());
|
builder.setVolumeName(args.getVolumeName())
|
||||||
bucket.setBucketName(args.getBucketName());
|
.setBucketName(args.getBucketName())
|
||||||
bucket.setAcls(args.getAddAcls());
|
.setIsVersionEnabled(getBucketVersioningProtobuf(
|
||||||
bucket.setVersioning(args.getVersioning());
|
args.getVersioning()))
|
||||||
bucket.setStorageType(args.getStorageType());
|
.setStorageType(args.getStorageType());
|
||||||
KeyData containerKeyData = fromBucketToContainerKeyData(
|
keySpaceManagerClient.createBucket(builder.build());
|
||||||
xceiverClient.getPipeline().getContainerName(), containerKey, bucket);
|
}
|
||||||
putKey(xceiverClient, containerKeyData, args.getRequestID());
|
|
||||||
} finally {
|
/**
|
||||||
xceiverClientManager.releaseClient(xceiverClient);
|
* Converts OzoneConts.Versioning enum to boolean.
|
||||||
|
*
|
||||||
|
* @param version
|
||||||
|
* @return corresponding boolean value
|
||||||
|
*/
|
||||||
|
private boolean getBucketVersioningProtobuf(
|
||||||
|
Versioning version) {
|
||||||
|
if(version != null) {
|
||||||
|
switch(version) {
|
||||||
|
case ENABLED:
|
||||||
|
return true;
|
||||||
|
case NOT_DEFINED:
|
||||||
|
case DISABLED:
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions
|
||||||
|
.KSMException.ResultCodes;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.ExpectedException;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.any;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests BucketManagerImpl, mocks MetadataManager for testing.
|
||||||
|
*/
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
public class TestBucketManagerImpl {
|
||||||
|
@Rule
|
||||||
|
public ExpectedException thrown = ExpectedException.none();
|
||||||
|
|
||||||
|
private MetadataManager getMetadataManagerMock(String... volumesToCreate)
|
||||||
|
throws IOException {
|
||||||
|
MetadataManager metadataManager = Mockito.mock(MetadataManager.class);
|
||||||
|
Map<String, byte[]> metadataDB = new HashMap<>();
|
||||||
|
ReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
|
Mockito.when(metadataManager.writeLock()).thenReturn(lock.writeLock());
|
||||||
|
Mockito.when(metadataManager.readLock()).thenReturn(lock.readLock());
|
||||||
|
Mockito.doAnswer(
|
||||||
|
new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
metadataDB.put(DFSUtil.bytes2String(
|
||||||
|
(byte[])invocation.getArguments()[0]),
|
||||||
|
(byte[])invocation.getArguments()[1]);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(metadataManager).put(any(byte[].class), any(byte[].class));
|
||||||
|
|
||||||
|
Mockito.when(metadataManager.get(any(byte[].class))).thenAnswer(
|
||||||
|
(InvocationOnMock invocation) ->
|
||||||
|
metadataDB.get(DFSUtil.bytes2String(
|
||||||
|
(byte[])invocation.getArguments()[0]))
|
||||||
|
);
|
||||||
|
for(String volumeName : volumesToCreate) {
|
||||||
|
byte[] dummyVolumeInfo = DFSUtil.string2Bytes(volumeName);
|
||||||
|
metadataDB.put(volumeName, dummyVolumeInfo);
|
||||||
|
}
|
||||||
|
return metadataManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateBucketWithoutVolume() throws IOException {
|
||||||
|
thrown.expectMessage("Volume doesn't exist");
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock();
|
||||||
|
try {
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setStorageType(StorageType.DISK)
|
||||||
|
.setIsVersionEnabled(false)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketArgs);
|
||||||
|
} catch(KSMException ksmEx) {
|
||||||
|
Assert.assertEquals(ResultCodes.FAILED_VOLUME_NOT_FOUND,
|
||||||
|
ksmEx.getResult());
|
||||||
|
throw ksmEx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateBucket() throws IOException {
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setStorageType(StorageType.DISK)
|
||||||
|
.setIsVersionEnabled(false)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketArgs);
|
||||||
|
//TODO: Use BucketManagerImpl#getBucketInfo to verify creation of bucket.
|
||||||
|
Assert.assertNotNull(metaMgr
|
||||||
|
.get(DFSUtil.string2Bytes("sampleVol/bucketOne")));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateAlreadyExistingBucket() throws IOException {
|
||||||
|
thrown.expectMessage("Bucket already exist");
|
||||||
|
MetadataManager metaMgr = getMetadataManagerMock("sampleVol");
|
||||||
|
try {
|
||||||
|
BucketManager bucketManager = new BucketManagerImpl(metaMgr);
|
||||||
|
KsmBucketArgs bucketArgs = KsmBucketArgs.newBuilder()
|
||||||
|
.setVolumeName("sampleVol")
|
||||||
|
.setBucketName("bucketOne")
|
||||||
|
.setStorageType(StorageType.DISK)
|
||||||
|
.setIsVersionEnabled(false)
|
||||||
|
.build();
|
||||||
|
bucketManager.createBucket(bucketArgs);
|
||||||
|
bucketManager.createBucket(bucketArgs);
|
||||||
|
} catch(KSMException ksmEx) {
|
||||||
|
Assert.assertEquals(ResultCodes.FAILED_BUCKET_ALREADY_EXISTS,
|
||||||
|
ksmEx.getResult());
|
||||||
|
throw ksmEx;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue