HDFS-11780. Ozone: KSM: Add putKey. Contributed by Chen Liang.
This commit is contained in:
parent
67da8be745
commit
e641bee7b7
|
@ -0,0 +1,88 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ksm.helpers;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Args for key. Client use this to specify key's attributes on key creation
|
||||||
|
* (putKey()).
|
||||||
|
*/
|
||||||
|
public final class KsmKeyArgs {
|
||||||
|
private final String volumeName;
|
||||||
|
private final String bucketName;
|
||||||
|
private final String keyName;
|
||||||
|
|
||||||
|
private final long dataSize;
|
||||||
|
|
||||||
|
private KsmKeyArgs(String volumeName, String bucketName, String keyName,
|
||||||
|
long dataSize) {
|
||||||
|
this.volumeName = volumeName;
|
||||||
|
this.bucketName = bucketName;
|
||||||
|
this.keyName = keyName;
|
||||||
|
this.dataSize = dataSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getVolumeName() {
|
||||||
|
return volumeName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBucketName() {
|
||||||
|
return bucketName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyName() {
|
||||||
|
return keyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDataSize() {
|
||||||
|
return dataSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder class of KsmKeyArgs.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private String volumeName;
|
||||||
|
private String bucketName;
|
||||||
|
private String keyName;
|
||||||
|
private long dataSize;
|
||||||
|
|
||||||
|
public Builder setVolumeName(String volume) {
|
||||||
|
this.volumeName = volume;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setBucketName(String bucket) {
|
||||||
|
this.bucketName = bucket;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setKeyName(String key) {
|
||||||
|
this.keyName = key;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setDataSize(long size) {
|
||||||
|
this.dataSize = size;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public KsmKeyArgs build() {
|
||||||
|
return new KsmKeyArgs(volumeName, bucketName, keyName, dataSize);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,156 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ksm.helpers;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Args for key block. The block instance for the key requested in putKey.
|
||||||
|
* This is returned from KSM to client, and client use class to talk to
|
||||||
|
* datanode. Also, this is the metadata written to ksm.db on server side.
|
||||||
|
*/
|
||||||
|
public final class KsmKeyInfo {
|
||||||
|
private final String volumeName;
|
||||||
|
private final String bucketName;
|
||||||
|
// name of key client specified
|
||||||
|
private final String keyName;
|
||||||
|
private final String containerName;
|
||||||
|
// name of the block id SCM assigned for the key
|
||||||
|
private final String blockID;
|
||||||
|
private final long dataSize;
|
||||||
|
private final boolean shouldCreateContainer;
|
||||||
|
|
||||||
|
private KsmKeyInfo(String volumeName, String bucketName, String keyName,
|
||||||
|
long dataSize, String blockID, String containerName,
|
||||||
|
boolean shouldCreateContainer) {
|
||||||
|
this.volumeName = volumeName;
|
||||||
|
this.bucketName = bucketName;
|
||||||
|
this.keyName = keyName;
|
||||||
|
this.containerName = containerName;
|
||||||
|
this.blockID = blockID;
|
||||||
|
this.dataSize = dataSize;
|
||||||
|
this.shouldCreateContainer = shouldCreateContainer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getVolumeName() {
|
||||||
|
return volumeName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBucketName() {
|
||||||
|
return bucketName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getKeyName() {
|
||||||
|
return keyName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBlockID() {
|
||||||
|
return blockID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getContainerName() {
|
||||||
|
return containerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDataSize() {
|
||||||
|
return dataSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getShouldCreateContainer() {
|
||||||
|
return shouldCreateContainer;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Builder of KsmKeyInfo.
|
||||||
|
*/
|
||||||
|
public static class Builder {
|
||||||
|
private String volumeName;
|
||||||
|
private String bucketName;
|
||||||
|
private String keyName;
|
||||||
|
private String containerName;
|
||||||
|
private String blockID;
|
||||||
|
private long dataSize;
|
||||||
|
private boolean shouldCreateContainer;
|
||||||
|
|
||||||
|
public Builder setVolumeName(String volume) {
|
||||||
|
this.volumeName = volume;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setBucketName(String bucket) {
|
||||||
|
this.bucketName = bucket;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setKeyName(String key) {
|
||||||
|
this.keyName = key;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setBlockID(String block) {
|
||||||
|
this.blockID = block;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setContainerName(String container) {
|
||||||
|
this.containerName = container;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setDataSize(long size) {
|
||||||
|
this.dataSize = size;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder setShouldCreateContainer(boolean create) {
|
||||||
|
this.shouldCreateContainer = create;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public KsmKeyInfo build() {
|
||||||
|
return new KsmKeyInfo(
|
||||||
|
volumeName, bucketName, keyName, dataSize, blockID, containerName,
|
||||||
|
shouldCreateContainer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public KeyInfo getProtobuf() {
|
||||||
|
return KeyInfo.newBuilder()
|
||||||
|
.setVolumeName(volumeName)
|
||||||
|
.setBucketName(bucketName)
|
||||||
|
.setKeyName(keyName)
|
||||||
|
.setDataSize(dataSize)
|
||||||
|
.setBlockKey(blockID)
|
||||||
|
.setContainerName(containerName)
|
||||||
|
.setShouldCreateContainer(shouldCreateContainer)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static KsmKeyInfo getFromProtobuf(KeyInfo keyInfo) {
|
||||||
|
return new KsmKeyInfo(
|
||||||
|
keyInfo.getVolumeName(),
|
||||||
|
keyInfo.getBucketName(),
|
||||||
|
keyInfo.getKeyName(),
|
||||||
|
keyInfo.getDataSize(),
|
||||||
|
keyInfo.getBlockKey(),
|
||||||
|
keyInfo.getContainerName(),
|
||||||
|
keyInfo.getShouldCreateContainer());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,6 +18,8 @@
|
||||||
package org.apache.hadoop.ksm.protocol;
|
package org.apache.hadoop.ksm.protocol;
|
||||||
|
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -113,4 +115,9 @@ public interface KeySpaceManagerProtocol {
|
||||||
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
KsmBucketInfo getBucketInfo(String volumeName, String bucketName)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a block to a container, the block is returned to the client.
|
||||||
|
*/
|
||||||
|
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -39,6 +41,12 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
|
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateKeyRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateKeyResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.KeyArgs;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
|
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -313,6 +321,36 @@ public final class KeySpaceManagerProtocolClientSideTranslatorPB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a block for a key, then use the returned meta info to talk to data
|
||||||
|
* node to actually write the key.
|
||||||
|
* @param args the args for the key to be allocated
|
||||||
|
* @return a handler to the key, returned client
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
|
||||||
|
CreateKeyRequest.Builder req = CreateKeyRequest.newBuilder();
|
||||||
|
KeyArgs keyArgs = KeyArgs.newBuilder()
|
||||||
|
.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setKeyName(args.getKeyName())
|
||||||
|
.setDataSize(args.getDataSize()).build();
|
||||||
|
req.setKeyArgs(keyArgs);
|
||||||
|
|
||||||
|
final CreateKeyResponse resp;
|
||||||
|
try {
|
||||||
|
resp = rpcProxy.createKey(NULL_RPC_CONTROLLER, req.build());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
if (resp.getStatus() != Status.OK) {
|
||||||
|
throw new IOException("Get key block failed, error:" +
|
||||||
|
resp.getStatus());
|
||||||
|
}
|
||||||
|
return KsmKeyInfo.getFromProtobuf(resp.getKeyInfo());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the proxy object underlying this protocol translator.
|
* Return the proxy object underlying this protocol translator.
|
||||||
*
|
*
|
||||||
|
|
|
@ -165,7 +165,7 @@ public final class ScmConfigKeys {
|
||||||
|
|
||||||
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
|
public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
|
||||||
"ozone.scm.container.provision_batch_size";
|
"ozone.scm.container.provision_batch_size";
|
||||||
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 10;
|
public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -210,6 +210,33 @@ message InfoBucketResponse {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
message KeyArgs {
|
||||||
|
required string volumeName = 1;
|
||||||
|
required string bucketName = 2;
|
||||||
|
required string keyName = 3;
|
||||||
|
required uint64 dataSize = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message KeyInfo {
|
||||||
|
required string volumeName = 1;
|
||||||
|
required string bucketName = 2;
|
||||||
|
required string keyName = 3;
|
||||||
|
required uint64 dataSize = 4;
|
||||||
|
required string blockKey = 5;
|
||||||
|
required string containerName = 6;
|
||||||
|
required bool shouldCreateContainer = 7;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateKeyRequest {
|
||||||
|
required KeyArgs keyArgs = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message CreateKeyResponse {
|
||||||
|
required Status status = 1;
|
||||||
|
optional KeyInfo keyInfo = 2;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
The KSM service that takes care of Ozone namespace.
|
The KSM service that takes care of Ozone namespace.
|
||||||
*/
|
*/
|
||||||
|
@ -261,4 +288,10 @@ service KeySpaceManagerService {
|
||||||
*/
|
*/
|
||||||
rpc infoBucket(InfoBucketRequest)
|
rpc infoBucket(InfoBucketRequest)
|
||||||
returns(InfoBucketResponse);
|
returns(InfoBucketResponse);
|
||||||
|
|
||||||
|
/**
|
||||||
|
Get key block.
|
||||||
|
*/
|
||||||
|
rpc createKey(CreateKeyRequest)
|
||||||
|
returns(CreateKeyResponse);
|
||||||
}
|
}
|
|
@ -33,6 +33,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numVolumeInfos;
|
private @Metric MutableCounterLong numVolumeInfos;
|
||||||
private @Metric MutableCounterLong numBucketCreates;
|
private @Metric MutableCounterLong numBucketCreates;
|
||||||
private @Metric MutableCounterLong numBucketInfos;
|
private @Metric MutableCounterLong numBucketInfos;
|
||||||
|
private @Metric MutableCounterLong numKeyBlockAllocate;
|
||||||
|
|
||||||
// Failure Metrics
|
// Failure Metrics
|
||||||
private @Metric MutableCounterLong numVolumeCreateFails;
|
private @Metric MutableCounterLong numVolumeCreateFails;
|
||||||
|
@ -40,6 +41,7 @@ public class KSMMetrics {
|
||||||
private @Metric MutableCounterLong numVolumeInfoFails;
|
private @Metric MutableCounterLong numVolumeInfoFails;
|
||||||
private @Metric MutableCounterLong numBucketCreateFails;
|
private @Metric MutableCounterLong numBucketCreateFails;
|
||||||
private @Metric MutableCounterLong numBucketInfoFails;
|
private @Metric MutableCounterLong numBucketInfoFails;
|
||||||
|
private @Metric MutableCounterLong numKeyBlockAllocateFails;
|
||||||
|
|
||||||
public KSMMetrics() {
|
public KSMMetrics() {
|
||||||
}
|
}
|
||||||
|
@ -91,6 +93,14 @@ public class KSMMetrics {
|
||||||
numBucketInfoFails.incr();
|
numBucketInfoFails.incr();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incNumKeyBlockAllocates() {
|
||||||
|
numKeyBlockAllocate.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incNumKeyBlockAllocateFails() {
|
||||||
|
numKeyBlockAllocateFails.incr();
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public long getNumVolumeCreates() {
|
public long getNumVolumeCreates() {
|
||||||
return numVolumeCreates.value();
|
return numVolumeCreates.value();
|
||||||
|
@ -141,4 +151,13 @@ public class KSMMetrics {
|
||||||
return numBucketInfoFails.value();
|
return numBucketInfoFails.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumKeyBlockAllocates() {
|
||||||
|
return numKeyBlockAllocate.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNumKeyBlockAllocateFailes() {
|
||||||
|
return numKeyBlockAllocateFails.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handles key level commands.
|
||||||
|
*/
|
||||||
|
public interface KeyManager {
|
||||||
|
/**
|
||||||
|
* Given the args of a key to put, return a pipeline for the key. Writes
|
||||||
|
* the key to pipeline mapping to meta data.
|
||||||
|
*
|
||||||
|
* Note that this call only allocate a block for key, and adds the
|
||||||
|
* corresponding entry to metadata. The block will be returned to client side
|
||||||
|
* handler DistributedStorageHandler. Which will make another call to
|
||||||
|
* datanode to create container (if needed) and writes the key.
|
||||||
|
*
|
||||||
|
* In case that the container creation or key write failed on
|
||||||
|
* DistributedStorageHandler, this key's metadata will still stay in KSM.
|
||||||
|
*
|
||||||
|
* @param args the args of the key provided by client.
|
||||||
|
* @return a KsmKeyInfo instance client uses to talk to container.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException;
|
||||||
|
}
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
|
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
|
||||||
|
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
|
||||||
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
|
import org.iq80.leveldb.DBException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementation of keyManager.
|
||||||
|
*/
|
||||||
|
public class KeyManagerImpl implements KeyManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(KeyManagerImpl.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A SCM block client, used to talk to SCM to allocate block during putKey.
|
||||||
|
*/
|
||||||
|
private final ScmBlockLocationProtocol scmBlockClient;
|
||||||
|
private final MetadataManager metadataManager;
|
||||||
|
|
||||||
|
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
|
||||||
|
MetadataManager metadataManager) {
|
||||||
|
this.scmBlockClient = scmBlockClient;
|
||||||
|
this.metadataManager = metadataManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
|
||||||
|
Preconditions.checkNotNull(args);
|
||||||
|
metadataManager.writeLock().lock();
|
||||||
|
String volumeName = args.getVolumeName();
|
||||||
|
String bucketName = args.getBucketName();
|
||||||
|
String keyName = args.getKeyName();
|
||||||
|
try {
|
||||||
|
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
|
||||||
|
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||||
|
byte[] keyKey = metadataManager.getDBKeyForKey(
|
||||||
|
volumeName, bucketName, keyName);
|
||||||
|
|
||||||
|
//Check if the volume exists
|
||||||
|
if(metadataManager.get(volumeKey) == null) {
|
||||||
|
LOG.error("volume not found: {}", volumeName);
|
||||||
|
throw new KSMException("Volume not found",
|
||||||
|
KSMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||||
|
}
|
||||||
|
//Check if bucket already exists
|
||||||
|
if(metadataManager.get(bucketKey) == null) {
|
||||||
|
LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
|
||||||
|
throw new KSMException("Bucket not found",
|
||||||
|
KSMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||||
|
}
|
||||||
|
// TODO throw exception if key exists, may change to support key
|
||||||
|
// overwrite in the future
|
||||||
|
//Check if key already exists.
|
||||||
|
if(metadataManager.get(keyKey) != null) {
|
||||||
|
LOG.error("key already exist: {}/{}/{} ", volumeName, bucketName,
|
||||||
|
keyName);
|
||||||
|
throw new KSMException("Key already exist",
|
||||||
|
KSMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
AllocatedBlock allocatedBlock =
|
||||||
|
scmBlockClient.allocateBlock(args.getDataSize());
|
||||||
|
KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
|
||||||
|
.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setKeyName(args.getKeyName())
|
||||||
|
.setDataSize(args.getDataSize())
|
||||||
|
.setBlockID(allocatedBlock.getKey())
|
||||||
|
.setContainerName(allocatedBlock.getPipeline().getContainerName())
|
||||||
|
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
|
||||||
|
.build();
|
||||||
|
metadataManager.put(keyKey, keyBlock.getProtobuf().toByteArray());
|
||||||
|
LOG.debug("Key {} allocated in volume {} bucket {}",
|
||||||
|
keyName, volumeName, bucketName);
|
||||||
|
return keyBlock;
|
||||||
|
} catch (DBException ex) {
|
||||||
|
LOG.error("Key allocation failed for volume:{} bucket:{} key:{}",
|
||||||
|
volumeName, bucketName, keyName, ex);
|
||||||
|
throw new KSMException(ex.getMessage(),
|
||||||
|
KSMException.ResultCodes.FAILED_INTERNAL_ERROR);
|
||||||
|
} finally {
|
||||||
|
metadataManager.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,17 +20,25 @@ package org.apache.hadoop.ozone.ksm;
|
||||||
import com.google.protobuf.BlockingService;
|
import com.google.protobuf.BlockingService;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||||
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneClientUtils;
|
import org.apache.hadoop.ozone.OzoneClientUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.protocolPB
|
import org.apache.hadoop.ozone.protocolPB
|
||||||
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
.KeySpaceManagerProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||||
|
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
|
||||||
|
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -63,6 +71,7 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
private final MetadataManager metadataManager;
|
private final MetadataManager metadataManager;
|
||||||
private final VolumeManager volumeManager;
|
private final VolumeManager volumeManager;
|
||||||
private final BucketManager bucketManager;
|
private final BucketManager bucketManager;
|
||||||
|
private final KeyManager keyManager;
|
||||||
private final KSMMetrics metrics;
|
private final KSMMetrics metrics;
|
||||||
|
|
||||||
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
|
||||||
|
@ -85,6 +94,31 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
volumeManager = new VolumeManagerImpl(metadataManager, conf);
|
||||||
bucketManager = new BucketManagerImpl(metadataManager);
|
bucketManager = new BucketManagerImpl(metadataManager);
|
||||||
metrics = KSMMetrics.create();
|
metrics = KSMMetrics.create();
|
||||||
|
keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a scm block client, used by putKey() and getKey().
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private ScmBlockLocationProtocol getScmBlockClient(OzoneConfiguration conf)
|
||||||
|
throws IOException {
|
||||||
|
RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
|
||||||
|
ProtobufRpcEngine.class);
|
||||||
|
long scmVersion =
|
||||||
|
RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
|
||||||
|
InetSocketAddress scmBlockAddress =
|
||||||
|
OzoneClientUtils.getScmAddressForBlockClients(conf);
|
||||||
|
ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
|
||||||
|
new ScmBlockLocationProtocolClientSideTranslatorPB(
|
||||||
|
RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
|
||||||
|
scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
|
||||||
|
NetUtils.getDefaultSocketFactory(conf),
|
||||||
|
Client.getRpcTimeout(conf)));
|
||||||
|
return scmBlockLocationClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -373,4 +407,20 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a key block.
|
||||||
|
* @param args - attributes of the key.
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public KsmKeyInfo allocateKey(KsmKeyArgs args) throws IOException {
|
||||||
|
try {
|
||||||
|
metrics.incNumKeyBlockAllocates();
|
||||||
|
return keyManager.allocateKey(args);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
metrics.incNumKeyBlockAllocateFails();
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,4 +95,13 @@ public interface MetadataManager {
|
||||||
* @param bucket - Bucket name
|
* @param bucket - Bucket name
|
||||||
*/
|
*/
|
||||||
byte[] getBucketKey(String volume, String bucket);
|
byte[] getBucketKey(String volume, String bucket);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a volume, bucket and a key, return the corresponding DB key.
|
||||||
|
* @param volume - volume name
|
||||||
|
* @param bucket - bucket name
|
||||||
|
* @param key - key name
|
||||||
|
* @return bytes of DB key.
|
||||||
|
*/
|
||||||
|
byte[] getDBKeyForKey(String volume, String bucket, String key);
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,6 +105,14 @@ public class MetadataManagerImpl implements MetadataManager {
|
||||||
return DFSUtil.string2Bytes(bucketKeyString);
|
return DFSUtil.string2Bytes(bucketKeyString);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getDBKeyForKey(String volume, String bucket, String key) {
|
||||||
|
String keyKeyString = OzoneConsts.KSM_VOLUME_PREFIX + volume
|
||||||
|
+ OzoneConsts.KSM_BUCKET_PREFIX + bucket + OzoneConsts.KSM_KEY_PREFIX
|
||||||
|
+ key;
|
||||||
|
return DFSUtil.string2Bytes(keyKeyString);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the read lock used on Metadata DB.
|
* Returns the read lock used on Metadata DB.
|
||||||
* @return readLock
|
* @return readLock
|
||||||
|
|
|
@ -102,6 +102,7 @@ public class KSMException extends IOException {
|
||||||
FAILED_USER_NOT_FOUND,
|
FAILED_USER_NOT_FOUND,
|
||||||
FAILED_BUCKET_ALREADY_EXISTS,
|
FAILED_BUCKET_ALREADY_EXISTS,
|
||||||
FAILED_BUCKET_NOT_FOUND,
|
FAILED_BUCKET_NOT_FOUND,
|
||||||
|
FAILED_KEY_ALREADY_EXISTS,
|
||||||
FAILED_INTERNAL_ERROR
|
FAILED_INTERNAL_ERROR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.protocolPB;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
import org.apache.hadoop.ksm.protocol.KeySpaceManagerProtocol;
|
||||||
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
|
||||||
|
@ -35,6 +37,12 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
|
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateKeyRequest;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.CreateKeyResponse;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.KeySpaceManagerProtocolProtos.KeyArgs;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
|
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -211,4 +219,27 @@ public class KeySpaceManagerProtocolServerSideTranslatorPB implements
|
||||||
}
|
}
|
||||||
return resp.build();
|
return resp.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CreateKeyResponse createKey(
|
||||||
|
RpcController controller, CreateKeyRequest request
|
||||||
|
) throws ServiceException {
|
||||||
|
CreateKeyResponse.Builder resp =
|
||||||
|
CreateKeyResponse.newBuilder();
|
||||||
|
try {
|
||||||
|
KeyArgs keyArgs = request.getKeyArgs();
|
||||||
|
KsmKeyArgs ksmKeyArgs = new KsmKeyArgs.Builder()
|
||||||
|
.setVolumeName(keyArgs.getVolumeName())
|
||||||
|
.setBucketName(keyArgs.getBucketName())
|
||||||
|
.setKeyName(keyArgs.getKeyName())
|
||||||
|
.setDataSize(keyArgs.getDataSize())
|
||||||
|
.build();
|
||||||
|
KsmKeyInfo keyInfo = impl.allocateKey(ksmKeyArgs);
|
||||||
|
resp.setKeyInfo(keyInfo.getProtobuf());
|
||||||
|
resp.setStatus(Status.OK);
|
||||||
|
} catch (IOException e) {
|
||||||
|
resp.setStatus(exceptionToResponseStatus(e));
|
||||||
|
}
|
||||||
|
return resp.build();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,7 +202,7 @@ public class BlockManagerImpl implements BlockManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public AllocatedBlock allocateBlock(final long size) throws IOException {
|
public AllocatedBlock allocateBlock(final long size) throws IOException {
|
||||||
boolean createContainer = false;
|
boolean createContainer;
|
||||||
Pipeline pipeline;
|
Pipeline pipeline;
|
||||||
if (size < 0 || size > containerSize) {
|
if (size < 0 || size > containerSize) {
|
||||||
throw new SCMException("Unsupported block size",
|
throw new SCMException("Unsupported block size",
|
||||||
|
@ -223,11 +223,13 @@ public class BlockManagerImpl implements BlockManager {
|
||||||
throw new SCMException("Unable to allocate container for the block",
|
throw new SCMException("Unable to allocate container for the block",
|
||||||
FAILED_TO_ALLOCATE_CONTAINER);
|
FAILED_TO_ALLOCATE_CONTAINER);
|
||||||
}
|
}
|
||||||
|
createContainer = true;
|
||||||
} else {
|
} else {
|
||||||
candidates = openContainers.entrySet().parallelStream()
|
candidates = openContainers.entrySet().parallelStream()
|
||||||
.filter(e -> (e.getValue() + size < containerSize))
|
.filter(e -> (e.getValue() + size < containerSize))
|
||||||
.map(e -> e.getKey())
|
.map(e -> e.getKey())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
createContainer = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (candidates.size() == 0) {
|
if (candidates.size() == 0) {
|
||||||
|
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset
|
||||||
.LengthInputStream;
|
.LengthInputStream;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
import org.apache.hadoop.ksm.helpers.KsmBucketInfo;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
|
||||||
|
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
|
||||||
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
|
||||||
import org.apache.hadoop.ksm.protocolPB
|
import org.apache.hadoop.ksm.protocolPB
|
||||||
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
.KeySpaceManagerProtocolClientSideTranslatorPB;
|
||||||
|
@ -53,6 +55,7 @@ import org.apache.hadoop.ozone.web.response.*;
|
||||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||||
import org.apache.hadoop.scm.storage.ChunkInputStream;
|
import org.apache.hadoop.scm.storage.ChunkInputStream;
|
||||||
import org.apache.hadoop.scm.storage.ChunkOutputStream;
|
import org.apache.hadoop.scm.storage.ChunkOutputStream;
|
||||||
|
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -279,15 +282,29 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
@Override
|
@Override
|
||||||
public OutputStream newKeyWriter(KeyArgs args) throws IOException,
|
public OutputStream newKeyWriter(KeyArgs args) throws IOException,
|
||||||
OzoneException {
|
OzoneException {
|
||||||
|
KsmKeyArgs keyArgs = new KsmKeyArgs.Builder()
|
||||||
|
.setVolumeName(args.getVolumeName())
|
||||||
|
.setBucketName(args.getBucketName())
|
||||||
|
.setKeyName(args.getKeyName())
|
||||||
|
.setDataSize(args.getSize())
|
||||||
|
.build();
|
||||||
|
// contact KSM to allocate a block for key.
|
||||||
String containerKey = buildContainerKey(args.getVolumeName(),
|
String containerKey = buildContainerKey(args.getVolumeName(),
|
||||||
args.getBucketName(), args.getKeyName());
|
args.getBucketName(), args.getKeyName());
|
||||||
KeyInfo key = new KeyInfo();
|
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
|
||||||
key.setKeyName(args.getKeyName());
|
// TODO the following createContainer and key writes may fail, in which
|
||||||
key.setCreatedOn(dateToString(new Date()));
|
// case we should revert the above allocateKey to KSM.
|
||||||
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
|
String containerName = keyInfo.getContainerName();
|
||||||
return new ChunkOutputStream(containerKey, key.getKeyName(),
|
XceiverClientSpi xceiverClient = getContainer(containerName);
|
||||||
xceiverClientManager, xceiverClient, args.getRequestID(),
|
if (keyInfo.getShouldCreateContainer()) {
|
||||||
chunkSize);
|
LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
|
||||||
|
args.getVolumeName(), args.getBucketName(), args.getKeyName());
|
||||||
|
ContainerProtocolCalls.createContainer(
|
||||||
|
xceiverClient, args.getRequestID());
|
||||||
|
}
|
||||||
|
// establish a connection to the container to write the key
|
||||||
|
return new ChunkOutputStream(containerKey, args.getKeyName(),
|
||||||
|
xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -334,6 +351,13 @@ public final class DistributedStorageHandler implements StorageHandler {
|
||||||
throw new UnsupportedOperationException("listKeys not implemented");
|
throw new UnsupportedOperationException("listKeys not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private XceiverClientSpi getContainer(String containerName)
|
||||||
|
throws IOException {
|
||||||
|
Pipeline pipeline =
|
||||||
|
storageContainerLocationClient.getContainer(containerName);
|
||||||
|
return xceiverClientManager.acquireClient(pipeline);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline}
|
* Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline}
|
||||||
* of nodes capable of serving container protocol operations.
|
* of nodes capable of serving container protocol operations.
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.commons.lang.RandomStringUtils;
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
@ -25,12 +26,14 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
import org.apache.hadoop.ozone.web.exceptions.OzoneException;
|
||||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||||
|
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||||
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
||||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||||
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
import org.apache.hadoop.ozone.web.request.OzoneQuota;
|
||||||
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
import org.apache.hadoop.ozone.web.response.BucketInfo;
|
||||||
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
import org.apache.hadoop.ozone.web.response.VolumeInfo;
|
||||||
|
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -38,6 +41,8 @@ import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,7 +70,8 @@ public class TestKeySpaceManager {
|
||||||
cluster = new MiniOzoneCluster.Builder(conf)
|
cluster = new MiniOzoneCluster.Builder(conf)
|
||||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||||
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
|
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
|
||||||
userArgs = new UserArgs(null, null, null, null, null, null);
|
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
|
||||||
|
null, null, null, null);
|
||||||
ksmMetrics = cluster.getKeySpaceManager().getMetrics();
|
ksmMetrics = cluster.getKeySpaceManager().getMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,4 +196,33 @@ public class TestKeySpaceManager {
|
||||||
Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails());
|
Assert.assertEquals(0, ksmMetrics.getNumBucketCreateFails());
|
||||||
Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
|
Assert.assertEquals(0, ksmMetrics.getNumBucketInfoFails());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetKeyWriter() throws IOException, OzoneException {
|
||||||
|
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||||
|
String keyName = "key" + RandomStringUtils.randomNumeric(5);
|
||||||
|
Assert.assertEquals(0, ksmMetrics.getNumKeyBlockAllocates());
|
||||||
|
|
||||||
|
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
|
||||||
|
createVolumeArgs.setUserName(userName);
|
||||||
|
createVolumeArgs.setAdminName(adminName);
|
||||||
|
storageHandler.createVolume(createVolumeArgs);
|
||||||
|
|
||||||
|
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
|
||||||
|
bucketArgs.setAddAcls(new LinkedList<>());
|
||||||
|
bucketArgs.setRemoveAcls(new LinkedList<>());
|
||||||
|
bucketArgs.setStorageType(StorageType.DISK);
|
||||||
|
storageHandler.createBucket(bucketArgs);
|
||||||
|
|
||||||
|
String dataString = RandomStringUtils.randomAscii(100);
|
||||||
|
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
|
||||||
|
keyArgs.setSize(4096);
|
||||||
|
try (OutputStream stream = storageHandler.newKeyWriter(keyArgs)) {
|
||||||
|
stream.write(dataString.getBytes());
|
||||||
|
}
|
||||||
|
Assert.assertEquals(1, ksmMetrics.getNumKeyBlockAllocates());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue