HDFS-11769. Ozone: KSM: Add createVolume API. Contributed by Mukul Kumar Singh.

This commit is contained in:
Anu Engineer 2017-05-15 21:38:08 -07:00 committed by Owen O'Malley
parent 9c9be9f7f7
commit c9e6d4378d
21 changed files with 1427 additions and 191 deletions

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ksm.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* A class that encapsulates the KsmVolumeArgs Args.
*/
public final class KsmVolumeArgs {
private final String adminName;
private final String ownerName;
private final String volume;
private final long quotaInBytes;
private final Map<String, String> keyValueMap;
/**
* Private constructor, constructed via builder.
* @param adminName - Administrator's name.
* @param ownerName - Volume owner's name
* @param volume - volume name
* @param quotaInBytes - Volume Quota in bytes.
* @param keyValueMap - keyValue map.
*/
private KsmVolumeArgs(String adminName, String ownerName, String volume,
long quotaInBytes, Map<String, String> keyValueMap) {
this.adminName = adminName;
this.ownerName = ownerName;
this.volume = volume;
this.quotaInBytes = quotaInBytes;
this.keyValueMap = keyValueMap;
}
/**
* Returns the Admin Name.
* @return String.
*/
public String getAdminName() {
return adminName;
}
/**
* Returns the owner Name.
* @return String
*/
public String getOwnerName() {
return ownerName;
}
/**
* Returns the volume Name.
* @return String
*/
public String getVolume() {
return volume;
}
/**
* Returns Quota in Bytes.
* @return long, Quota in bytes.
*/
public long getQuotaInBytes() {
return quotaInBytes;
}
public Map<String, String> getKeyValueMap() {
return keyValueMap;
}
/**
* Returns new builder class that builds a KsmVolumeArgs.
*
* @return Builder
*/
public static Builder newBuilder() {
return new Builder();
}
/**
* Builder for KsmVolumeArgs.
*/
public static class Builder {
private String adminName;
private String ownerName;
private String volume;
private long quotaInBytes;
private Map<String, String> keyValueMap;
/**
* Constructs a builder.
*/
Builder() {
keyValueMap = new HashMap<>();
}
public Builder setAdminName(String adminName) {
this.adminName = adminName;
return this;
}
public Builder setOwnerName(String ownerName) {
this.ownerName = ownerName;
return this;
}
public Builder setVolume(String volume) {
this.volume = volume;
return this;
}
public Builder setQuotaInBytes(long quotaInBytes) {
this.quotaInBytes = quotaInBytes;
return this;
}
public Builder addMetadata(String key, String value) {
keyValueMap.put(key, value); // overwrite if present.
return this;
}
/**
* Constructs a CreateVolumeArgument.
* @return CreateVolumeArgs.
*/
public KsmVolumeArgs build() {
Preconditions.checkNotNull(adminName);
Preconditions.checkNotNull(ownerName);
Preconditions.checkNotNull(volume);
return new KsmVolumeArgs(adminName, ownerName, volume, quotaInBytes,
keyValueMap);
}
}
public VolumeInfo getProtobuf() {
List<KeyValue> list = new LinkedList<>();
for (Map.Entry<String, String> entry : keyValueMap.entrySet()) {
list.add(KeyValue.newBuilder().setKey(entry.getKey()).
setValue(entry.getValue()).build());
}
return VolumeInfo.newBuilder()
.setAdminName(adminName)
.setOwnerName(ownerName)
.setVolume(volume)
.setQuotaInBytes(quotaInBytes)
.addAllMetadata(list)
.build();
}
public static KsmVolumeArgs getFromProtobuf(VolumeInfo volInfo) {
return new KsmVolumeArgs(volInfo.getAdminName(), volInfo.getOwnerName(),
volInfo.getVolume(), volInfo.getQuotaInBytes(),
volInfo.getMetadataList().stream()
.collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue)));
}
}

View File

@ -17,8 +17,7 @@
*/
package org.apache.hadoop.ksm.protocol;
import org.apache.hadoop.ksm.helpers.VolumeArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import java.io.IOException;
import java.util.List;
@ -32,7 +31,7 @@ public interface KeyspaceManagerProtocol {
* @param args - Arguments to create Volume.
* @throws IOException
*/
void createVolume(VolumeArgs args) throws IOException;
void createVolume(KsmVolumeArgs args) throws IOException;
/**
* Changes the owner of a volume.
@ -64,7 +63,7 @@ public interface KeyspaceManagerProtocol {
* @return VolumeArgs or exception is thrown.
* @throws IOException
*/
VolumeArgs getVolumeinfo(String volume) throws IOException;
KsmVolumeArgs getVolumeInfo(String volume) throws IOException;
/**
* Deletes the an exisiting empty volume.
@ -82,7 +81,7 @@ public interface KeyspaceManagerProtocol {
* @return List of Volumes.
* @throws IOException
*/
List<VolumeArgs> listVolumeByUser(String userName, String prefix, String
List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix, String
prevKey, long maxKeys) throws IOException;
/**
@ -93,6 +92,6 @@ public interface KeyspaceManagerProtocol {
* @return List of Volumes.
* @throws IOException
*/
List<VolumeArgs> listAllVolumes(String prefix, String
List<KsmVolumeArgs> listAllVolumes(String prefix, String
prevKey, long maxKeys) throws IOException;
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ksm.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* The client side implementation of KeyspaceManagerProtocol.
*/
@InterfaceAudience.Private
public final class KeySpaceManagerProtocolClientSideTranslatorPB
implements KeyspaceManagerProtocol, ProtocolTranslator, Closeable {
/**
* RpcController is not used and hence is set to null.
*/
private static final RpcController NULL_RPC_CONTROLLER = null;
private final KeySpaceManagerProtocolPB rpcProxy;
/**
* Constructor for KeySpaceManger Client.
* @param rpcProxy
*/
public KeySpaceManagerProtocolClientSideTranslatorPB(
KeySpaceManagerProtocolPB rpcProxy) {
this.rpcProxy = rpcProxy;
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
* <p>
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public void close() throws IOException {
}
/**
* Creates a volume.
*
* @param args - Arguments to create Volume.
* @throws IOException
*/
@Override
public void createVolume(KsmVolumeArgs args) throws IOException {
CreateVolumeRequest.Builder req =
CreateVolumeRequest.newBuilder();
VolumeInfo volumeInfo = args.getProtobuf();
req.setVolumeInfo(volumeInfo);
final CreateVolumeResponse resp;
try {
resp = rpcProxy.createVolume(NULL_RPC_CONTROLLER,
req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
if (resp.getStatus() != Status.OK) {
throw new IOException("Volume creation failed error" + resp.getStatus());
}
}
/**
* Changes the owner of a volume.
*
* @param volume - Name of the volume.
* @param owner - Name of the owner.
* @throws IOException
*/
@Override
public void setOwner(String volume, String owner) throws IOException {
}
/**
* Changes the Quota on a volume.
*
* @param volume - Name of the volume.
* @param quota - Quota in bytes.
* @throws IOException
*/
@Override
public void setQuota(String volume, long quota) throws IOException {
}
/**
* Checks if the specified user can access this volume.
*
* @param volume - volume
* @param userName - user name
* @throws IOException
*/
@Override
public void checkVolumeAccess(String volume, String userName) throws
IOException {
}
/**
* Gets the volume information.
*
* @param volume - Volume name.s
* @return KsmVolumeArgs or exception is thrown.
* @throws IOException
*/
@Override
public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
return null;
}
/**
* Deletes the an exisiting empty volume.
*
* @param volume - Name of the volume.
* @throws IOException
*/
@Override
public void deleteVolume(String volume) throws IOException {
}
/**
* Lists volume owned by a specific user.
*
* @param userName - user name
* @param prefix - Filter prefix -- Return only entries that match this.
* @param prevKey - Previous key -- List starts from the next from the
* prevkey
* @param maxKeys - Max number of keys to return.
* @return List of Volumes.
* @throws IOException
*/
@Override
public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
String prevKey, long maxKeys)
throws IOException {
return null;
}
/**
* Lists volume all volumes in the cluster.
*
* @param prefix - Filter prefix -- Return only entries that match this.
* @param prevKey - Previous key -- List starts from the next from the
* prevkey
* @param maxKeys - Max number of keys to return.
* @return List of Volumes.
* @throws IOException
*/
@Override
public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, long
maxKeys) throws IOException {
return null;
}
/**
* Return the proxy object underlying this protocol translator.
*
* @return the proxy object underlying this protocol translator.
*/
@Override
public Object getUnderlyingProxyObject() {
return null;
}
}

View File

@ -41,8 +41,11 @@ enum Status {
VOLUME_NOT_UNIQUE = 2;
VOLUME_NOT_FOUND = 3;
VOLUME_NOT_EMPTY = 4;
USER_NOT_FOUND = 5;
ACCESS_DENIED = 6;
VOLUME_ALREADY_EXISTS = 5;
USER_NOT_FOUND = 6;
USER_TOO_MANY_VOLUMES = 7;
ACCESS_DENIED = 8;
INTERNAL_ERROR = 9;
}
@ -66,6 +69,10 @@ message CreateVolumeResponse {
required Status status = 1;
}
message VolumeList {
repeated string volumeNames = 1;
}
/**
Changes the Volume Properties -- like ownership and quota for a volume.
*/

View File

@ -30,6 +30,9 @@ import java.util.Map;
import com.sun.jersey.api.container.ContainerFactory;
import com.sun.jersey.api.core.ApplicationAdapter;
import org.apache.hadoop.ksm.protocolPB
.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.slf4j.Logger;
@ -62,6 +65,8 @@ public final class ObjectStoreHandler implements Closeable {
LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
private final KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient;
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
@ -84,21 +89,34 @@ public final class ObjectStoreHandler implements Closeable {
if (OzoneConsts.OZONE_HANDLER_DISTRIBUTED.equalsIgnoreCase(shType)) {
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
long version =
long scmVersion =
RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
InetSocketAddress address =
InetSocketAddress scmAddress =
OzoneClientUtils.getScmAddressForClients(conf);
this.storageContainerLocationClient =
new StorageContainerLocationProtocolClientSideTranslatorPB(
RPC.getProxy(StorageContainerLocationProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), Client.getTimeout(conf)));
RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
scmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
long ksmVersion =
RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
this.keySpaceManagerClient =
new KeySpaceManagerProtocolClientSideTranslatorPB(
RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
ksmAddress, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf),
Client.getRpcTimeout(conf)));
storageHandler = new DistributedStorageHandler(new OzoneConfiguration(),
this.storageContainerLocationClient);
this.storageContainerLocationClient,
this.keySpaceManagerClient);
} else {
if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) {
storageHandler = new LocalStorageHandler(conf);
this.storageContainerLocationClient = null;
this.keySpaceManagerClient = null;
} else {
throw new IllegalArgumentException(
String.format("Unrecognized value for %s: %s,"

View File

@ -80,6 +80,7 @@ public final class OzoneConsts {
public static final String BLOCK_DB = "block.db";
public static final String NODEPOOL_DB = "nodepool.db";
public static final String OPEN_CONTAINERS_DB = "openContainers.db";
public static final String KSM_DB_NAME = "ksm.db";
/**
* Supports Bucket Versioning.
@ -87,7 +88,7 @@ public final class OzoneConsts {
public enum Versioning {NOT_DEFINED, ENABLED, DISABLED}
/**
* Ozone handler types
* Ozone handler types.
*/
public static final String OZONE_HANDLER_DISTRIBUTED = "distributed";
public static final String OZONE_HANDLER_LOCAL = "local";

View File

@ -37,4 +37,13 @@ public final class KSMConfigKeys {
public static final String OZONE_KSM_BIND_HOST_DEFAULT =
"0.0.0.0";
public static final int OZONE_KSM_PORT_DEFAULT = 9862;
// LevelDB cache file uses an off-heap cache in LevelDB of 128 MB.
public static final String OZONE_KSM_DB_CACHE_SIZE_MB =
"ozone.ksm.leveldb.cache.size.mb";
public static final int OZONE_KSM_DB_CACHE_SIZE_DEFAULT = 128;
public static final String OZONE_KSM_USER_MAX_VOLUME =
"ozone.ksm.user.max.volume";
public static final int OZONE_KSM_USER_MAX_VOLUME_DEFAULT = 1024;
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.ksm;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
/**
* This class is for maintaining KeySpaceManager statistics.
*/
public class KSMMetrics {
// KSM op metrics
private @Metric MutableCounterLong numVolumeCreates;
// Failure Metrics
private @Metric MutableCounterLong numVolumeCreateFails;
public KSMMetrics() {
}
public static KSMMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register("KSMMetrics",
"Key Space Manager Metrics",
new KSMMetrics());
}
public void incNumVolumeCreates() {
numVolumeCreates.incr();
}
public void incNumVolumeCreateFails() {
numVolumeCreates.incr();
}
@VisibleForTesting
public long getNumVolumeCreates() {
return numVolumeCreates.value();
}
@VisibleForTesting
public long getNumVolumeCreateFails() {
return numVolumeCreateFails.value();
}
}

View File

@ -17,12 +17,13 @@
package org.apache.hadoop.ozone.ksm;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ksm.helpers.VolumeArgs;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.OzoneClientUtils;
@ -38,13 +39,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KSM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
.OZONE_KSM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos
.KeyspaceManagerService.newReflectiveBlockingService;
import static org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.KeyspaceManagerService
.newReflectiveBlockingService;
import static org.apache.hadoop.util.ExitUtil.terminate;
/**
@ -52,12 +55,13 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
public class KeySpaceManager implements KeyspaceManagerProtocol {
// TODO: Support JMX
private static final Logger LOG =
LoggerFactory.getLogger(KeySpaceManager.class);
private final RPC.Server ksmRpcServer;
private final InetSocketAddress ksmRpcAddress;
private final VolumeManager volumeManager;
private final KSMMetrics metrics;
public KeySpaceManager(OzoneConfiguration conf) throws IOException {
final int handlerCount = conf.getInt(OZONE_KSM_HANDLER_COUNT_KEY,
@ -75,8 +79,8 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
handlerCount);
ksmRpcAddress = updateListenAddress(conf,
OZONE_KSM_ADDRESS_KEY, ksmNodeRpcAddr, ksmRpcServer);
//TODO : Add call to register MXBean for JMX.
volumeManager = new VolumeManagerImpl(this, conf);
metrics = KSMMetrics.create();
}
/**
@ -108,6 +112,19 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
return rpcServer;
}
public KSMMetrics getMetrics() {
return metrics;
}
/**
* Returns listening address of Key Space Manager RPC server.
*
* @return listen address of Key Space Manager RPC server
*/
@VisibleForTesting
public InetSocketAddress getClientRpcAddress() {
return ksmRpcAddress;
}
/**
* Main entry point for starting KeySpaceManager.
*
@ -168,9 +185,22 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
public void start() {
LOG.info(buildRpcServerStartMessage("KeyspaceManager RPC server",
ksmRpcAddress));
volumeManager.start();
ksmRpcServer.start();
}
/**
* Stop service.
*/
public void stop() {
try {
ksmRpcServer.stop();
volumeManager.stop();
} catch (IOException e) {
LOG.info("Key Space Manager stop failed.", e);
}
}
/**
* Wait until service has completed shutdown.
*/
@ -179,7 +209,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
ksmRpcServer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.info("Interrupted during KeyspaceManager join.");
LOG.info("Interrupted during KeyspaceManager join.", e);
}
}
@ -190,8 +220,9 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
* @throws IOException
*/
@Override
public void createVolume(VolumeArgs args) throws IOException {
public void createVolume(KsmVolumeArgs args) throws IOException {
metrics.incNumVolumeCreates();
volumeManager.createVolume(args);
}
/**
@ -239,7 +270,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
* @throws IOException
*/
@Override
public VolumeArgs getVolumeinfo(String volume) throws IOException {
public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
return null;
}
@ -266,7 +297,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
* @throws IOException
*/
@Override
public List<VolumeArgs> listVolumeByUser(String userName, String prefix,
public List<KsmVolumeArgs> listVolumeByUser(String userName, String prefix,
String prevKey, long maxKeys) throws IOException {
return null;
}
@ -282,7 +313,7 @@ public class KeySpaceManager implements KeyspaceManagerProtocol {
* @throws IOException
*/
@Override
public List<VolumeArgs> listAllVolumes(String prefix, String prevKey, long
public List<KsmVolumeArgs> listAllVolumes(String prefix, String prevKey, long
maxKeys) throws IOException {
return null;
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import java.io.IOException;
/**
* KSM volume manager interface.
*/
public interface VolumeManager {
/**
* Start volume manager.
*/
void start();
/**
* Stop volume manager.
*/
void stop() throws IOException;
/**
* Create a new volume.
* @param args - Volume args to create a volume
*/
void createVolume(KsmVolumeArgs args) throws IOException;
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeList;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.VolumeInfo;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
import static org.apache.hadoop.ozone.ksm
.KSMConfigKeys.OZONE_KSM_USER_MAX_VOLUME;
import static org.apache.hadoop.ozone.ksm.exceptions
.KSMException.ResultCodes;
/**
* KSM volume management code.
*/
public class VolumeManagerImpl implements VolumeManager {
private static final Logger LOG =
LoggerFactory.getLogger(VolumeManagerImpl.class);
private final KeySpaceManager ksm;
private final LevelDBStore store;
private final ReadWriteLock lock;
private final int maxUserVolumeCount;
/**
* Constructor.
* @param conf - Ozone configuration.
* @throws IOException
*/
public VolumeManagerImpl(KeySpaceManager ksm, OzoneConfiguration conf)
throws IOException {
File metaDir = OzoneUtils.getScmMetadirPath(conf);
final int cacheSize = conf.getInt(OZONE_KSM_DB_CACHE_SIZE_MB,
OZONE_KSM_DB_CACHE_SIZE_DEFAULT);
Options options = new Options();
options.cacheSize(cacheSize * OzoneConsts.MB);
File ksmDBFile = new File(metaDir.getPath(), KSM_DB_NAME);
this.ksm = ksm;
this.store = new LevelDBStore(ksmDBFile, options);
lock = new ReentrantReadWriteLock();
this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
}
@Override
public void start() {
}
@Override
public void stop() throws IOException {
store.close();
}
/**
* Creates a volume.
* @param args - KsmVolumeArgs.
*/
@Override
public void createVolume(KsmVolumeArgs args) throws IOException {
Preconditions.checkNotNull(args);
lock.writeLock().lock();
WriteBatch batch = store.createWriteBatch();
try {
byte[] volumeName = store.get(DFSUtil.string2Bytes(args.getVolume()));
// Check of the volume already exists
if(volumeName != null) {
LOG.error("volume:{} already exists", args.getVolume());
throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
}
// Next count the number of volumes for the user
String dbUserName = "$" + args.getOwnerName();
byte[] volumeList = store.get(DFSUtil.string2Bytes(dbUserName));
List prevVolList;
if (volumeList != null) {
VolumeList vlist = VolumeList.parseFrom(volumeList);
prevVolList = vlist.getVolumeNamesList();
} else {
prevVolList = new LinkedList();
}
if (prevVolList.size() >= maxUserVolumeCount) {
LOG.error("Too many volumes for user:{}", args.getOwnerName());
throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
}
// Commit the volume information to leveldb
VolumeInfo volumeInfo = args.getProtobuf();
batch.put(DFSUtil.string2Bytes(args.getVolume()),
volumeInfo.toByteArray());
prevVolList.add(args.getVolume());
VolumeList newVolList = VolumeList.newBuilder()
.addAllVolumeNames(prevVolList).build();
batch.put(DFSUtil.string2Bytes(dbUserName), newVolList.toByteArray());
store.commitWriteBatch(batch);
LOG.info("created volume:{} user:{}",
args.getVolume(), args.getOwnerName());
} catch (IOException | DBException ex) {
ksm.getMetrics().incNumVolumeCreateFails();
LOG.error("Volume creation failed for user:{} volname:{}",
args.getOwnerName(), args.getVolume(), ex);
throw ex;
} finally {
store.closeWriteBatch(batch);
lock.writeLock().unlock();
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.ksm.exceptions;
import java.io.IOException;
/**
* Exception thrown by KSM.
*/
public class KSMException extends IOException {
private final KSMException.ResultCodes result;
/**
* Constructs an {@code IOException} with {@code null}
* as its error detail message.
*/
public KSMException(KSMException.ResultCodes result) {
this.result = result;
}
/**
* Constructs an {@code IOException} with the specified detail message.
*
* @param message The detail message (which is saved for later retrieval by
* the
* {@link #getMessage()} method)
*/
public KSMException(String message, KSMException.ResultCodes result) {
super(message);
this.result = result;
}
/**
* Constructs an {@code IOException} with the specified detail message
* and cause.
* <p>
* <p> Note that the detail message associated with {@code cause} is
* <i>not</i> automatically incorporated into this exception's detail
* message.
*
* @param message The detail message (which is saved for later retrieval by
* the
* {@link #getMessage()} method)
* @param cause The cause (which is saved for later retrieval by the {@link
* #getCause()} method). (A null value is permitted, and indicates that the
* cause is nonexistent or unknown.)
* @since 1.6
*/
public KSMException(String message, Throwable cause,
KSMException.ResultCodes result) {
super(message, cause);
this.result = result;
}
/**
* Constructs an {@code IOException} with the specified cause and a
* detail message of {@code (cause==null ? null : cause.toString())}
* (which typically contains the class and detail message of {@code cause}).
* This constructor is useful for IO exceptions that are little more
* than wrappers for other throwables.
*
* @param cause The cause (which is saved for later retrieval by the {@link
* #getCause()} method). (A null value is permitted, and indicates that the
* cause is nonexistent or unknown.)
* @since 1.6
*/
public KSMException(Throwable cause, KSMException.ResultCodes result) {
super(cause);
this.result = result;
}
/**
* Returns resultCode.
* @return ResultCode
*/
public KSMException.ResultCodes getResult() {
return result;
}
/**
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {
FAILED_TOO_MANY_USER_VOLUMES,
FAILED_VOLUME_ALREADY_EXISTS,
FAILED_INTERNAL_ERROR
}
}

View File

@ -0,0 +1,19 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm.exceptions;
// Exception thrown by KSM.

View File

@ -18,9 +18,40 @@ package org.apache.hadoop.ozone.protocolPB;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ksm.protocol.KeyspaceManagerProtocol;
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolPB;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.SetVolumePropertyResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.InfoVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.InfoVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.DeleteVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.DeleteVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.ListVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto
.KeySpaceManagerProtocolProtos.Status;
import java.io.IOException;
/**
* This class is the server-side translator that forwards requests received on
@ -42,47 +73,61 @@ public class KeyspaceManagerProtocolServerSideTranslatorPB implements
}
@Override
public KeySpaceManagerProtocolProtos.CreateVolumeResponse createVolume(
RpcController controller, KeySpaceManagerProtocolProtos
.CreateVolumeRequest
request) throws ServiceException {
return null;
public CreateVolumeResponse createVolume(
RpcController controller, CreateVolumeRequest request)
throws ServiceException {
CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.createVolume(KsmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
} catch (IOException e) {
if (e instanceof KSMException) {
KSMException ksmException = (KSMException)e;
if (ksmException.getResult() ==
ResultCodes.FAILED_VOLUME_ALREADY_EXISTS) {
resp.setStatus(Status.VOLUME_ALREADY_EXISTS);
} else if (ksmException.getResult() ==
ResultCodes.FAILED_TOO_MANY_USER_VOLUMES) {
resp.setStatus(Status.USER_TOO_MANY_VOLUMES);
}
} else {
resp.setStatus(Status.INTERNAL_ERROR);
}
}
return resp.build();
}
@Override
public KeySpaceManagerProtocolProtos.SetVolumePropertyResponse
setVolumeProperty(RpcController controller, KeySpaceManagerProtocolProtos
.SetVolumePropertyRequest request) throws ServiceException {
return null;
}
@Override
public KeySpaceManagerProtocolProtos.CheckVolumeAccessResponse
checkVolumeAccess(RpcController controller, KeySpaceManagerProtocolProtos
.CheckVolumeAccessRequest request) throws ServiceException {
return null;
}
@Override
public KeySpaceManagerProtocolProtos.InfoVolumeResponse infoVolume(
RpcController controller,
KeySpaceManagerProtocolProtos.InfoVolumeRequest request)
public SetVolumePropertyResponse setVolumeProperty(
RpcController controller, SetVolumePropertyRequest request)
throws ServiceException {
return null;
}
@Override
public KeySpaceManagerProtocolProtos.DeleteVolumeResponse deleteVolume(
RpcController controller, KeySpaceManagerProtocolProtos
.DeleteVolumeRequest
request) throws ServiceException {
public CheckVolumeAccessResponse checkVolumeAccess(
RpcController controller, CheckVolumeAccessRequest request)
throws ServiceException {
return null;
}
@Override
public KeySpaceManagerProtocolProtos.ListVolumeResponse listVolumes(
RpcController controller,
KeySpaceManagerProtocolProtos.ListVolumeRequest request)
public InfoVolumeResponse infoVolume(
RpcController controller, InfoVolumeRequest request)
throws ServiceException {
return null;
}
@Override
public DeleteVolumeResponse deleteVolume(
RpcController controller, DeleteVolumeRequest request)
throws ServiceException {
return null;
}
@Override
public ListVolumeResponse listVolumes(
RpcController controller, ListVolumeRequest request)
throws ServiceException {
return null;
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.GetKeyRespons
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ksm.helpers.KsmVolumeArgs;
import org.apache.hadoop.ksm.protocolPB.KeySpaceManagerProtocolClientSideTranslatorPB;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@ -47,7 +49,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.Set;
import java.util.TimeZone;
import java.util.Locale;
import java.util.HashSet;
import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.ozone.web.storage.OzoneContainerTranslation.*;
import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.getKey;
@ -62,7 +70,9 @@ public final class DistributedStorageHandler implements StorageHandler {
LoggerFactory.getLogger(DistributedStorageHandler.class);
private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocation;
storageContainerLocationClient;
private final KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient;
private final XceiverClientManager xceiverClientManager;
private int chunkSize;
@ -72,11 +82,15 @@ public final class DistributedStorageHandler implements StorageHandler {
*
* @param conf configuration
* @param storageContainerLocation StorageContainerLocationProtocol proxy
* @param keySpaceManagerClient KeySpaceManager proxy
*/
public DistributedStorageHandler(OzoneConfiguration conf,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocation) {
this.storageContainerLocation = storageContainerLocation;
storageContainerLocation,
KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient) {
this.keySpaceManagerClient = keySpaceManagerClient;
this.storageContainerLocationClient = storageContainerLocation;
this.xceiverClientManager = new XceiverClientManager(conf);
chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@ -92,21 +106,15 @@ public final class DistributedStorageHandler implements StorageHandler {
@Override
public void createVolume(VolumeArgs args) throws IOException, OzoneException {
String containerKey = buildContainerKey(args.getVolumeName());
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
try {
VolumeInfo volume = new VolumeInfo();
volume.setVolumeName(args.getVolumeName());
volume.setQuota(args.getQuota());
volume.setOwner(new VolumeOwner(args.getUserName()));
volume.setCreatedOn(dateToString(new Date()));
volume.setCreatedBy(args.getAdminName());
KeyData containerKeyData = fromVolumeToContainerKeyData(
xceiverClient.getPipeline().getContainerName(), containerKey, volume);
putKey(xceiverClient, containerKeyData, args.getRequestID());
} finally {
xceiverClientManager.releaseClient(xceiverClient);
}
long quota = args.getQuota() == null ?
Long.MAX_VALUE : args.getQuota().sizeInBytes();
KsmVolumeArgs volumeArgs = KsmVolumeArgs.newBuilder()
.setAdminName(args.getAdminName())
.setOwnerName(args.getUserName())
.setVolume(args.getVolumeName())
.setQuotaInBytes(quota)
.build();
keySpaceManagerClient.createVolume(volumeArgs);
}
@Override
@ -293,9 +301,9 @@ public final class DistributedStorageHandler implements StorageHandler {
}
/**
* Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline} of nodes
* capable of serving container protocol operations. The container is
* selected based on the specified container key.
* Acquires an {@link XceiverClientSpi} connected to a {@link Pipeline}
* of nodes capable of serving container protocol operations.
* The container is selected based on the specified container key.
*
* @param containerKey container key
* @return XceiverClient connected to a container
@ -304,7 +312,7 @@ public final class DistributedStorageHandler implements StorageHandler {
private XceiverClientSpi acquireXceiverClient(String containerKey)
throws IOException {
Set<LocatedContainer> locatedContainers =
storageContainerLocation.getStorageContainerLocations(
storageContainerLocationClient.getStorageContainerLocations(
new HashSet<>(Arrays.asList(containerKey)));
Pipeline pipeline = newPipelineFromLocatedContainer(
locatedContainers.iterator().next());

View File

@ -18,7 +18,10 @@
package org.apache.hadoop.ozone.web.utils;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
@ -30,6 +33,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Request;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.MediaType;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
@ -272,4 +276,23 @@ public final class OzoneUtils {
.build();
}
/**
* Checks and creates Ozone Metadir Path if it does not exist.
*
* @param conf - Configuration
*
* @return File MetaDir
*/
public static File getScmMetadirPath(Configuration conf) {
String metaDirPath = conf.getTrimmed(OzoneConfigKeys
.OZONE_CONTAINER_METADATA_DIRS);
Preconditions.checkNotNull(metaDirPath);
File dirPath = new File(metaDirPath);
if (!dirPath.exists() && !dirPath.mkdirs()) {
throw new IllegalArgumentException("Unable to create paths. Path: " +
dirPath);
}
return dirPath;
}
}

View File

@ -19,7 +19,11 @@
package org.apache.hadoop.utils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.*;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.WriteOptions;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import java.io.Closeable;
import java.io.File;
@ -32,6 +36,7 @@ public class LevelDBStore implements Closeable {
private DB db;
private final File dbFile;
private final Options dbOptions;
private final WriteOptions writeOptions;
/**
* Opens a DB file.
@ -49,6 +54,7 @@ public class LevelDBStore implements Closeable {
throw new IOException("Db is null");
}
this.dbFile = dbPath;
this.writeOptions = new WriteOptions().sync(true);
}
/**
@ -65,6 +71,7 @@ public class LevelDBStore implements Closeable {
throw new IOException("Db is null");
}
this.dbFile = dbPath;
this.writeOptions = new WriteOptions().sync(true);
}
@ -75,9 +82,7 @@ public class LevelDBStore implements Closeable {
* @param value - value
*/
public void put(byte[] key, byte[] value) {
WriteOptions options = new WriteOptions();
options.sync(true);
db.put(key, value, options);
db.put(key, value, writeOptions);
}
/**
@ -167,7 +172,7 @@ public class LevelDBStore implements Closeable {
* @param wb
*/
public void commitWriteBatch(WriteBatch wb) {
db.write(wb);
db.write(wb, writeOptions);
}
/**

View File

@ -26,8 +26,11 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.ksm.KeySpaceManager;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
@ -67,6 +70,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
private final OzoneConfiguration conf;
private final StorageContainerManager scm;
private final KeySpaceManager ksm;
private final Path tempPath;
/**
@ -76,11 +80,13 @@ public final class MiniOzoneCluster extends MiniDFSCluster
* @param scm StorageContainerManager, already running
* @throws IOException if there is an I/O error
*/
private MiniOzoneCluster(Builder builder, StorageContainerManager scm)
private MiniOzoneCluster(Builder builder, StorageContainerManager scm,
KeySpaceManager ksm)
throws IOException {
super(builder);
this.conf = builder.conf;
this.scm = scm;
this.ksm = ksm;
tempPath = Paths.get(builder.getPath(), builder.getRunID());
}
@ -126,18 +132,28 @@ public final class MiniOzoneCluster extends MiniDFSCluster
public void shutdown() {
super.shutdown();
LOG.info("Shutting down the Mini Ozone Cluster");
if (scm == null) {
return;
if (ksm != null) {
LOG.info("Shutting down the keySpaceManager");
ksm.stop();
ksm.join();
}
if (scm != null) {
LOG.info("Shutting down the StorageContainerManager");
scm.stop();
scm.join();
}
}
public StorageContainerManager getStorageContainerManager() {
return this.scm;
}
public KeySpaceManager getKeySpaceManager() {
return this.ksm;
}
/**
* Creates an {@link OzoneClient} connected to this cluster's REST service.
* Callers take ownership of the client and must close it when done.
@ -336,6 +352,7 @@ public final class MiniOzoneCluster extends MiniDFSCluster
conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "127.0.0.1:0");
conf.set(ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY, "127.0.0.1:0");
conf.set(KSMConfigKeys.OZONE_KSM_ADDRESS_KEY, "127.0.0.1:0");
// Use random ports for ozone containers in mini cluster,
// in order to launch multiple container servers per node.
@ -344,11 +361,15 @@ public final class MiniOzoneCluster extends MiniDFSCluster
StorageContainerManager scm = new StorageContainerManager(conf);
scm.start();
KeySpaceManager ksm = new KeySpaceManager(conf);
ksm.start();
String addressString = scm.getDatanodeRpcAddress().getHostString() +
":" + scm.getDatanodeRpcAddress().getPort();
conf.setStrings(ScmConfigKeys.OZONE_SCM_NAMES, addressString);
MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm);
MiniOzoneCluster cluster = new MiniOzoneCluster(this, scm, ksm);
try {
cluster.waitOzoneReady();
if (waitForChillModeFinish) {

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.Rule;
import org.junit.BeforeClass;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.Assert;
import org.junit.rules.Timeout;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Test ozone volume in the distributed storage handler scenario.
*/
public class TestDistributedOzoneVolumes extends TestOzoneHelper {
private static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestDistributedOzoneVolumes.class);
/**
* Set the timeout for every test.
*/
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster cluster = null;
private static int port = 0;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Creates Volumes on Ozone Store.
*
* @throws IOException
*/
@Test
public void testCreateVolumes() throws IOException {
super.testCreateVolumes(port);
Assert.assertEquals(cluster.getKeySpaceManager()
.getMetrics().getNumVolumeCreates(), 1);
Assert.assertEquals(cluster.getKeySpaceManager()
.getMetrics().getNumVolumeCreateFails(), 0);
}
/**
* Create Volumes with Quota.
*
* @throws IOException
*/
public void testCreateVolumesWithQuota() throws IOException {
super.testCreateVolumesWithQuota(port);
}
/**
* Create Volumes with Invalid Quota.
*
* @throws IOException
*/
public void testCreateVolumesWithInvalidQuota() throws IOException {
super.testCreateVolumesWithInvalidQuota(port);
}
/**
* To create a volume a user name must be specified using OZONE_USER header.
* This test verifies that we get an error in case we call without a OZONE
* user name.
*
* @throws IOException
*/
public void testCreateVolumesWithInvalidUser() throws IOException {
super.testCreateVolumesWithInvalidUser(port);
}
/**
* Only Admins can create volumes in Ozone. This test uses simple userauth as
* backend and hdfs and root are admin users in the simple backend.
* <p>
* This test tries to create a volume as user bilbo.
*
* @throws IOException
*/
public void testCreateVolumesWithOutAdminRights() throws IOException {
super.testCreateVolumesWithOutAdminRights(port);
}
/**
* Create a bunch of volumes in a loop.
*
* @throws IOException
*/
public void testCreateVolumesInLoop() throws IOException {
super.testCreateVolumesInLoop(port);
}
/**
* Get volumes owned by the user.
*
* @throws IOException
*/
public void testGetVolumesByUser() throws IOException {
testGetVolumesByUser(port);
}
/**
* Admins can read volumes belonging to other users.
*
* @throws IOException
*/
public void testGetVolumesOfAnotherUser() throws IOException {
super.testGetVolumesOfAnotherUser(port);
}
/**
* if you try to read volumes belonging to another user,
* then server always ignores it.
*
* @throws IOException
*/
public void testGetVolumesOfAnotherUserShouldFail() throws IOException {
super.testGetVolumesOfAnotherUserShouldFail(port);
}
}

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.io.IOException;
import java.net.URL;
/**
* Test ozone volume in the local storage handler scenario.
*/
public class TestLocalOzoneVolumes extends TestOzoneHelper {
/**
* Set the timeout for every test.
*/
@Rule
public Timeout testTimeout = new Timeout(300000);
private static MiniOzoneCluster cluster = null;
private static int port = 0;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "local" , which uses a local directory to
* emulate Ozone backend.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath()
.concat(TestLocalOzoneVolumes.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL).build();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Creates Volumes on Ozone Store.
*
* @throws IOException
*/
@Test
public void testCreateVolumes() throws IOException {
super.testCreateVolumes(port);
}
/**
* Create Volumes with Quota.
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithQuota() throws IOException {
super.testCreateVolumesWithQuota(port);
}
/**
* Create Volumes with Invalid Quota.
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithInvalidQuota() throws IOException {
super.testCreateVolumesWithInvalidQuota(port);
}
/**
* To create a volume a user name must be specified using OZONE_USER header.
* This test verifies that we get an error in case we call without a OZONE
* user name.
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithInvalidUser() throws IOException {
super.testCreateVolumesWithInvalidUser(port);
}
/**
* Only Admins can create volumes in Ozone. This test uses simple userauth as
* backend and hdfs and root are admin users in the simple backend.
* <p>
* This test tries to create a volume as user bilbo.
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithOutAdminRights() throws IOException {
super.testCreateVolumesWithOutAdminRights(port);
}
/**
* Create a bunch of volumes in a loop.
*
* @throws IOException
*/
//@Test
public void testCreateVolumesInLoop() throws IOException {
super.testCreateVolumesInLoop(port);
}
/**
* Get volumes owned by the user.
*
* @throws IOException
*/
@Test
public void testGetVolumesByUser() throws IOException {
super.testGetVolumesByUser(port);
}
/**
* Admins can read volumes belonging to other users.
*
* @throws IOException
*/
@Test
public void testGetVolumesOfAnotherUser() throws IOException {
super.testGetVolumesOfAnotherUser(port);
}
/**
* if you try to read volumes belonging to another user,
* then server always ignores it.
*
* @throws IOException
*/
@Test
public void testGetVolumesOfAnotherUserShouldFail() throws IOException {
super.testGetVolumesOfAnotherUserShouldFail(port);
}
}

View File

@ -17,31 +17,19 @@
*/
package org.apache.hadoop.ozone.web;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.web.exceptions.ErrorTable;
import org.apache.hadoop.ozone.web.headers.Header;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.util.Time;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import javax.ws.rs.core.HttpHeaders;
import java.io.IOException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
@ -50,63 +38,23 @@ import static java.net.HttpURLConnection.HTTP_CREATED;
import static java.net.HttpURLConnection.HTTP_OK;
import static org.junit.Assert.assertEquals;
public class TestOzoneVolumes {
/**
* Set the timeout for every test.
/**
* Helper functions to test Ozone.
*/
@Rule
public Timeout testTimeout = new Timeout(300000);
public class TestOzoneHelper {
private static MiniOzoneCluster cluster = null;
private static int port = 0;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "local" , which uses a local directory to
* emulate Ozone backend.
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
URL p = conf.getClass().getResource("");
String path = p.getPath().concat(TestOzoneVolumes.class.getSimpleName());
path += conf.getTrimmed(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT,
OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT_DEFAULT);
conf.set(OzoneConfigKeys.OZONE_LOCALSTORAGE_ROOT, path);
Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_LOCAL).build();
DataNode dataNode = cluster.getDataNodes().get(0);
port = dataNode.getInfoPort();
public CloseableHttpClient createHttpClient() {
return HttpClientBuilder.create().build();
}
/**
* shutdown MiniDFSCluster
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* Creates Volumes on Ozone Store.
*
* @throws IOException
*/
@Test
public void testCreateVolumes() throws IOException {
public void testCreateVolumes(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String volumeName = OzoneUtils.getRequestID().toLowerCase();
try {
HttpPost httppost = new HttpPost(
@ -125,7 +73,7 @@ public class TestOzoneVolumes {
assertEquals(response.toString(), HTTP_CREATED,
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -134,11 +82,10 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithQuota() throws IOException {
public void testCreateVolumesWithQuota(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String volumeName = OzoneUtils.getRequestID().toLowerCase();
try {
HttpPost httppost = new HttpPost(
@ -157,7 +104,7 @@ public class TestOzoneVolumes {
assertEquals(response.toString(), HTTP_CREATED,
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -166,11 +113,10 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithInvalidQuota() throws IOException {
public void testCreateVolumesWithInvalidQuota(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String volumeName = OzoneUtils.getRequestID().toLowerCase();
try {
HttpPost httppost = new HttpPost(
@ -190,7 +136,7 @@ public class TestOzoneVolumes {
.getHttpCode(),
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -201,11 +147,10 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithInvalidUser() throws IOException {
public void testCreateVolumesWithInvalidUser(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String volumeName = OzoneUtils.getRequestID().toLowerCase();
try {
HttpPost httppost = new HttpPost(
@ -224,7 +169,7 @@ public class TestOzoneVolumes {
assertEquals(response.toString(), ErrorTable.USER_NOT_FOUND.getHttpCode(),
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -236,11 +181,10 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testCreateVolumesWithOutAdminRights() throws IOException {
public void testCreateVolumesWithOutAdminRights(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String volumeName = OzoneUtils.getRequestID().toLowerCase();
try {
HttpPost httppost = new HttpPost(
@ -259,7 +203,7 @@ public class TestOzoneVolumes {
assertEquals(response.toString(), ErrorTable.ACCESS_DENIED.getHttpCode(),
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -268,13 +212,12 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
//@Test
public void testCreateVolumesInLoop() throws IOException {
public void testCreateVolumesInLoop(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
for (int x = 0; x < 1000; x++) {
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String volumeName = OzoneUtils.getRequestID().toLowerCase();
String userName = OzoneUtils.getRequestID().toLowerCase();
@ -293,7 +236,7 @@ public class TestOzoneVolumes {
HttpResponse response = client.execute(httppost);
assertEquals(response.toString(), HTTP_CREATED,
response.getStatusLine().getStatusCode());
client.getConnectionManager().shutdown();
client.close();
}
}
/**
@ -301,13 +244,12 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testGetVolumesByUser() throws IOException {
public void testGetVolumesByUser(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
// We need to create a volume for this test to succeed.
testCreateVolumes();
HttpClient client = new DefaultHttpClient();
testCreateVolumes(port);
CloseableHttpClient client = createHttpClient();
try {
HttpGet httpget =
new HttpGet(String.format("http://localhost:%d/", port));
@ -323,14 +265,14 @@ public class TestOzoneVolumes {
OzoneConsts.OZONE_SIMPLE_HDFS_USER);
httpget.addHeader(Header.OZONE_USER,
OzoneConsts.OZONE_SIMPLE_HDFS_USER );
OzoneConsts.OZONE_SIMPLE_HDFS_USER);
HttpResponse response = client.execute(httpget);
assertEquals(response.toString(), HTTP_OK,
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -339,12 +281,11 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testGetVolumesOfAnotherUser() throws IOException {
public void testGetVolumesOfAnotherUser(int port) throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
try {
HttpGet httpget =
new HttpGet(String.format("http://localhost:%d/", port));
@ -366,7 +307,7 @@ public class TestOzoneVolumes {
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}
@ -376,12 +317,12 @@ public class TestOzoneVolumes {
*
* @throws IOException
*/
@Test
public void testGetVolumesOfAnotherUserShouldFail() throws IOException {
public void testGetVolumesOfAnotherUserShouldFail(int port)
throws IOException {
SimpleDateFormat format =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss ZZZ", Locale.US);
HttpClient client = new DefaultHttpClient();
CloseableHttpClient client = createHttpClient();
String userName = OzoneUtils.getRequestID().toLowerCase();
try {
HttpGet httpget =
@ -406,7 +347,7 @@ public class TestOzoneVolumes {
response.getStatusLine().getStatusCode());
} finally {
client.getConnectionManager().shutdown();
client.close();
}
}