HDFS-11920. Ozone : add key partition. Contributed by Chen Liang.

This commit is contained in:
Chen Liang 2017-07-31 16:01:39 -07:00 committed by Owen O'Malley
parent bc413a69cc
commit 070f7e2e59
18 changed files with 1172 additions and 253 deletions

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.ksm.helpers;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
import java.util.List;
import java.util.stream.Collectors;
/**
* Args for key block. The block instance for the key requested in putKey.
* This is returned from KSM to client, and client use class to talk to
@ -30,25 +32,19 @@ public final class KsmKeyInfo {
private final String bucketName;
// name of key client specified
private final String keyName;
private final String containerName;
// name of the block id SCM assigned for the key
private final String blockID;
private final long dataSize;
private final boolean shouldCreateContainer;
private List<KsmKeyLocationInfo> keyLocationList;
private final long creationTime;
private final long modificationTime;
private KsmKeyInfo(String volumeName, String bucketName, String keyName,
long dataSize, String blockID, String containerName,
boolean shouldCreateContainer, long creationTime,
List<KsmKeyLocationInfo> locationInfos, long dataSize, long creationTime,
long modificationTime) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
this.containerName = containerName;
this.blockID = blockID;
this.dataSize = dataSize;
this.shouldCreateContainer = shouldCreateContainer;
this.keyLocationList = locationInfos;
this.creationTime = creationTime;
this.modificationTime = modificationTime;
}
@ -65,20 +61,12 @@ public final class KsmKeyInfo {
return keyName;
}
public String getBlockID() {
return blockID;
}
public String getContainerName() {
return containerName;
}
public long getDataSize() {
return dataSize;
}
public boolean getShouldCreateContainer() {
return shouldCreateContainer;
public List<KsmKeyLocationInfo> getKeyLocationList() {
return keyLocationList;
}
public long getCreationTime() {
@ -96,10 +84,8 @@ public final class KsmKeyInfo {
private String volumeName;
private String bucketName;
private String keyName;
private String containerName;
private String blockID;
private long dataSize;
private boolean shouldCreateContainer;
private List<KsmKeyLocationInfo> ksmKeyLocationInfos;
private long creationTime;
private long modificationTime;
@ -118,13 +104,9 @@ public final class KsmKeyInfo {
return this;
}
public Builder setBlockID(String block) {
this.blockID = block;
return this;
}
public Builder setContainerName(String container) {
this.containerName = container;
public Builder setKsmKeyLocationInfos(
List<KsmKeyLocationInfo> ksmKeyLocationInfoList) {
this.ksmKeyLocationInfos = ksmKeyLocationInfoList;
return this;
}
@ -133,11 +115,6 @@ public final class KsmKeyInfo {
return this;
}
public Builder setShouldCreateContainer(boolean create) {
this.shouldCreateContainer = create;
return this;
}
public Builder setCreationTime(long creationTime) {
this.creationTime = creationTime;
return this;
@ -150,8 +127,8 @@ public final class KsmKeyInfo {
public KsmKeyInfo build() {
return new KsmKeyInfo(
volumeName, bucketName, keyName, dataSize, blockID, containerName,
shouldCreateContainer, creationTime, modificationTime);
volumeName, bucketName, keyName, ksmKeyLocationInfos,
dataSize, creationTime, modificationTime);
}
}
@ -161,9 +138,8 @@ public final class KsmKeyInfo {
.setBucketName(bucketName)
.setKeyName(keyName)
.setDataSize(dataSize)
.setBlockKey(blockID)
.setContainerName(containerName)
.setShouldCreateContainer(shouldCreateContainer)
.addAllKeyLocationList(keyLocationList.stream()
.map(KsmKeyLocationInfo::getProtobuf).collect(Collectors.toList()))
.setCreationTime(creationTime)
.setModificationTime(modificationTime)
.build();
@ -174,10 +150,10 @@ public final class KsmKeyInfo {
keyInfo.getVolumeName(),
keyInfo.getBucketName(),
keyInfo.getKeyName(),
keyInfo.getKeyLocationListList().stream()
.map(KsmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()),
keyInfo.getDataSize(),
keyInfo.getBlockKey(),
keyInfo.getContainerName(),
keyInfo.getShouldCreateContainer(),
keyInfo.getCreationTime(),
keyInfo.getModificationTime());
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ksm.helpers;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyLocation;
/**
* One key can be too huge to fit in one container. In which case it gets split
* into a number of subkeys. This class represents one such subkey instance.
*/
public final class KsmKeyLocationInfo {
private final String containerName;
// name of the block id SCM assigned for the key
private final String blockID;
private final boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
private final int index;
private final long length;
private final long offset;
private KsmKeyLocationInfo(String containerName,
String blockID, boolean shouldCreateContainer, int index,
long length, long offset) {
this.containerName = containerName;
this.blockID = blockID;
this.shouldCreateContainer = shouldCreateContainer;
this.index = index;
this.length = length;
this.offset = offset;
}
public String getContainerName() {
return containerName;
}
public String getBlockID() {
return blockID;
}
public boolean getShouldCreateContainer() {
return shouldCreateContainer;
}
public int getIndex() {
return index;
}
public long getLength() {
return length;
}
public long getOffset() {
return offset;
}
/**
* Builder of KsmKeyLocationInfo.
*/
public static class Builder {
private String containerName;
private String blockID;
private boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
private int index;
private long length;
private long offset;
public Builder setContainerName(String container) {
this.containerName = container;
return this;
}
public Builder setBlockID(String block) {
this.blockID = block;
return this;
}
public Builder setShouldCreateContainer(boolean create) {
this.shouldCreateContainer = create;
return this;
}
public Builder setIndex(int id) {
this.index = id;
return this;
}
public Builder setLength(long len) {
this.length = len;
return this;
}
public Builder setOffset(long off) {
this.offset = off;
return this;
}
public KsmKeyLocationInfo build() {
return new KsmKeyLocationInfo(containerName, blockID,
shouldCreateContainer, index, length, offset);
}
}
public KeyLocation getProtobuf() {
return KeyLocation.newBuilder()
.setContainerName(containerName)
.setBlockID(blockID)
.setShouldCreateContainer(shouldCreateContainer)
.setIndex(index)
.setLength(length)
.setOffset(offset)
.build();
}
public static KsmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
return new KsmKeyLocationInfo(
keyLocation.getContainerName(),
keyLocation.getBlockID(),
keyLocation.getShouldCreateContainer(),
keyLocation.getIndex(),
keyLocation.getLength(),
keyLocation.getOffset());
}
}

View File

@ -225,16 +225,23 @@ message KeyArgs {
optional uint64 dataSize = 4;
}
message KeyLocation {
required string blockID = 1;
required string containerName = 2;
required bool shouldCreateContainer = 3;
required uint64 offset = 4;
required uint64 length = 5;
required uint32 index = 6;
}
message KeyInfo {
required string volumeName = 1;
required string bucketName = 2;
required string keyName = 3;
required uint64 dataSize = 4;
required string blockKey = 5;
required string containerName = 6;
required bool shouldCreateContainer = 7;
required uint64 creationTime = 8;
required uint64 modificationTime = 9;
repeated KeyLocation keyLocationList = 5;
required uint64 creationTime = 6;
required uint64 modificationTime = 7;
}
message LocateKeyRequest {

View File

@ -20,12 +20,7 @@ package org.apache.hadoop.ozone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@ -39,24 +34,20 @@ import org.apache.hadoop.ksm.protocolPB
import org.apache.hadoop.ksm.protocolPB
.KeySpaceManagerProtocolPB;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
import org.apache.hadoop.ozone.io.OzoneInputStream;
import org.apache.hadoop.ozone.io.OzoneOutputStream;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolPB;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -480,39 +471,11 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
.setDataSize(size)
.build();
String containerKey = buildContainerKey(volumeName, bucketName, keyName);
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
// TODO: the following createContainer and key writes may fail, in which
// case we should revert the above allocateKey to KSM.
String containerName = keyInfo.getContainerName();
XceiverClientSpi xceiverClient = getContainer(containerName);
if (keyInfo.getShouldCreateContainer()) {
LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
volumeName, bucketName, keyName);
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
}
// establish a connection to the container to write the key
ChunkOutputStream outputStream = new ChunkOutputStream(containerKey,
keyName, xceiverClientManager, xceiverClient, requestId, chunkSize);
return new OzoneOutputStream(outputStream);
}
/**
* Creates a container key from any number of components by combining all
* components with a delimiter.
*
* @param parts container key components
* @return container key
*/
private static String buildContainerKey(String... parts) {
return '/' + StringUtils.join('/', parts);
}
private XceiverClientSpi getContainer(String containerName)
throws IOException {
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);
return xceiverClientManager.acquireClient(pipeline);
ChunkGroupOutputStream groupOutputStream =
ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
storageContainerLocationClient, chunkSize, requestId);
return new OzoneOutputStream(groupOutputStream);
}
@Override
@ -529,29 +492,12 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
.setKeyName(keyName)
.build();
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
String containerKey = buildContainerKey(volumeName,
bucketName, keyName);
String containerName = keyInfo.getContainerName();
XceiverClientSpi xceiverClient = getContainer(containerName);
boolean success = false;
try {
LOG.debug("get key accessing {} {}",
xceiverClient.getPipeline().getContainerName(), containerKey);
KeyData containerKeyData = KeyData.newBuilder().setContainerName(
xceiverClient.getPipeline().getContainerName())
.setName(containerKey).build();
GetKeyResponseProto response = ContainerProtocolCalls
.getKey(xceiverClient, containerKeyData, requestId);
List<ChunkInfo> chunks = response.getKeyData().getChunksList();
success = true;
return new OzoneInputStream(new ChunkInputStream(
containerKey, xceiverClientManager, xceiverClient,
chunks, requestId));
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient);
}
}
LengthInputStream lengthInputStream =
ChunkGroupInputStream.getFromKsmKeyInfo(
keyInfo, xceiverClientManager, storageContainerLocationClient,
requestId);
return new OzoneInputStream(
(ChunkGroupInputStream)lengthInputStream.getWrappedStream());
}
@Override

View File

@ -73,6 +73,10 @@ public final class OzoneConfigKeys {
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
public static final String OZONE_SCM_BLOCK_SIZE_KEY =
"ozone.scm.block.size";
public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256 * OzoneConsts.MB;
/**
* Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.ozone;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
import java.util.List;
/**
* A class that encapsulates OzoneKey.
@ -37,19 +40,15 @@ public class OzoneKey {
* Name of the Key.
*/
private final String keyName;
/**
* Name of the Container the Key resides in.
*/
private final String containerName;
/**
* Name of the block id SCM assigned for the key.
*/
private final String blockID;
/**
* Size of the data.
*/
private final long dataSize;
/**
* All the locations of this key, in an ordered list.
*/
private final List<KsmKeyLocationInfo> keyLocations;
/**
* Constructs OzoneKey from KsmKeyInfo.
*
@ -59,9 +58,8 @@ public class OzoneKey {
this.volumeName = ksmKeyInfo.getVolumeName();
this.bucketName = ksmKeyInfo.getBucketName();
this.keyName = ksmKeyInfo.getKeyName();
this.containerName = ksmKeyInfo.getContainerName();
this.blockID = ksmKeyInfo.getBlockID();
this.dataSize = ksmKeyInfo.getDataSize();
this.keyLocations = ksmKeyInfo.getKeyLocationList();
}
/**
@ -91,24 +89,6 @@ public class OzoneKey {
return keyName;
}
/**
* Returns Container Name associated with the Key.
*
* @return containerName
*/
public String getContainerName() {
return containerName;
}
/**
* Returns BlockID associated with the Key.
*
* @return blockID
*/
public String getBlockID() {
return blockID;
}
/**
* Returns the size of the data.
*
@ -117,4 +97,13 @@ public class OzoneKey {
public long getDataSize() {
return dataSize;
}
/**
* Retruns the list of the key locations.
*
* @return key locations
*/
public List<KsmKeyLocationInfo> getKeyLocations() {
return keyLocations;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.io;
import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import java.io.IOException;
@ -28,14 +29,14 @@ import java.io.InputStream;
*/
public class OzoneInputStream extends InputStream {
private final ChunkInputStream inputStream;
private final ChunkGroupInputStream inputStream;
/**
* Constructs OzoneInputStream with ChunkInputStream.
*
* @param inputStream
*/
public OzoneInputStream(ChunkInputStream inputStream) {
public OzoneInputStream(ChunkGroupInputStream inputStream) {
this.inputStream = inputStream;
}

View File

@ -17,25 +17,25 @@
package org.apache.hadoop.ozone.io;
import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* OzoneOutputStream is used to write data into Ozone.
* It uses SCM's {@link ChunkOutputStream} for writing the data.
* It uses SCM's {@link ChunkGroupOutputStream} for writing the data.
*/
public class OzoneOutputStream extends OutputStream {
private final ChunkOutputStream outputStream;
private final ChunkGroupOutputStream outputStream;
/**
* Constructs OzoneOutputStream with ChunkOutputStream.
* Constructs OzoneOutputStream with ChunkGroupOutputStream.
*
* @param outputStream
*/
public OzoneOutputStream(ChunkOutputStream outputStream) {
public OzoneOutputStream(ChunkGroupOutputStream outputStream) {
this.outputStream = outputStream;
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
@ -31,8 +33,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY;
/**
* Implementation of keyManager.
*/
@ -45,11 +51,14 @@ public class KeyManagerImpl implements KeyManager {
*/
private final ScmBlockLocationProtocol scmBlockClient;
private final MetadataManager metadataManager;
private final long scmBlockSize;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
MetadataManager metadataManager) {
MetadataManager metadataManager, OzoneConfiguration conf) {
this.scmBlockClient = scmBlockClient;
this.metadataManager = metadataManager;
this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,
OZONE_SCM_BLOCK_SIZE_DEFAULT);
}
@Override
@ -92,17 +101,37 @@ public class KeyManagerImpl implements KeyManager {
// with a actual SCM block.
// TODO : Review this decision later. We can get away with only a
// metadata entry in case of 0 length key.
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(Math.max(args.getDataSize(), 1));
long targetSize = args.getDataSize();
List<KsmKeyLocationInfo> subKeyInfos = new ArrayList<>();
int idx = 0;
long offset = 0;
// in case targetSize == 0, subKeyInfos will be an empty list
while (targetSize > 0) {
long allocateSize = Math.min(targetSize, scmBlockSize);
AllocatedBlock allocatedBlock =
scmBlockClient.allocateBlock(allocateSize);
KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
.setContainerName(allocatedBlock.getPipeline().getContainerName())
.setBlockID(allocatedBlock.getKey())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setIndex(idx)
.setLength(allocateSize)
.setOffset(offset)
.build();
idx += 1;
offset += allocateSize;
targetSize -= allocateSize;
subKeyInfos.add(subKeyInfo);
}
long currentTime = Time.now();
KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize())
.setBlockID(allocatedBlock.getKey())
.setContainerName(allocatedBlock.getPipeline().getContainerName())
.setShouldCreateContainer(allocatedBlock.getCreateContainer())
.setKsmKeyLocationInfos(subKeyInfos)
.setCreationTime(currentTime)
.setModificationTime(currentTime)
.build();

View File

@ -98,7 +98,8 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
volumeManager = new VolumeManagerImpl(metadataManager, conf);
bucketManager = new BucketManagerImpl(metadataManager);
metrics = KSMMetrics.create();
keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager);
keyManager = new KeyManagerImpl(
getScmBlockClient(conf), metadataManager, conf);
httpServer = new KeySpaceManagerHttpServer(conf);
}

View File

@ -400,11 +400,13 @@ public class SQLCLI extends Configured implements Tool {
break;
case KEY:
KeyInfo keyInfo = KeyInfo.parseFrom(value);
// TODO : the two fields container name and block id are no longer used,
// need to revisit this later.
String insertKeyInfo =
String.format(INSERT_KEY_INFO, keyInfo.getVolumeName(),
keyInfo.getBucketName(), keyInfo.getKeyName(),
keyInfo.getDataSize(), keyInfo.getBlockKey(),
keyInfo.getContainerName());
keyInfo.getDataSize(), "EMPTY",
"EMPTY");
executeSQL(conn, insertKeyInfo);
break;
default:

View File

@ -166,6 +166,7 @@ public class KeyHandler implements Keys {
String contentLenString = getContentLength(headers, args);
String newLen = contentLenString.replaceAll("\"", "");
int contentLen = Integer.parseInt(newLen);
args.setSize(contentLen);
MessageDigest md5 = MessageDigest.getInstance("MD5");
int bytesRead = 0;

View File

@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Maintaining a list of ChunkInputStream. Read based on offset.
*/
public class ChunkGroupInputStream extends InputStream {
private static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupInputStream.class);
private static final int EOF = -1;
private final ArrayList<ChunkInputStreamEntry> streamEntries;
private int currentStreamIndex;
public ChunkGroupInputStream() {
streamEntries = new ArrayList<>();
currentStreamIndex = 0;
}
@VisibleForTesting
public synchronized int getCurrentStreamIndex() {
return currentStreamIndex;
}
@VisibleForTesting
public long getRemainingOfIndex(int index) {
return streamEntries.get(index).getRemaining();
}
/**
* Append another stream to the end of the list.
*
* @param stream the stream instance.
* @param length the max number of bytes that should be written to this
* stream.
*/
public synchronized void addStream(InputStream stream, long length) {
streamEntries.add(new ChunkInputStreamEntry(stream, length));
}
@Override
public synchronized int read() throws IOException {
if (streamEntries.size() <= currentStreamIndex) {
throw new IndexOutOfBoundsException();
}
ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
int data = entry.read();
return data;
}
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return 0;
}
int totalReadLen = 0;
while (len > 0) {
if (streamEntries.size() <= currentStreamIndex) {
return totalReadLen == 0 ? EOF : totalReadLen;
}
ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
int readLen = Math.min(len, (int)current.getRemaining());
int actualLen = current.read(b, off, readLen);
// this means the underlying stream has nothing at all, return
if (actualLen == EOF) {
return totalReadLen > 0? totalReadLen : EOF;
}
totalReadLen += actualLen;
// this means there is no more data to read beyond this point, return
if (actualLen != readLen) {
return totalReadLen;
}
off += readLen;
len -= readLen;
if (current.getRemaining() <= 0) {
currentStreamIndex += 1;
}
}
return totalReadLen;
}
private static class ChunkInputStreamEntry extends InputStream {
private final InputStream inputStream;
private final long length;
private long currentPosition;
ChunkInputStreamEntry(InputStream chunkInputStream, long length) {
this.inputStream = chunkInputStream;
this.length = length;
this.currentPosition = 0;
}
synchronized long getRemaining() {
return length - currentPosition;
}
@Override
public synchronized int read(byte[] b, int off, int len)
throws IOException {
int readLen = inputStream.read(b, off, len);
currentPosition += readLen;
return readLen;
}
@Override
public synchronized int read() throws IOException {
int data = inputStream.read();
currentPosition += 1;
return data;
}
@Override
public synchronized void close() throws IOException {
inputStream.close();
}
}
public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient, String requestId)
throws IOException {
int index = 0;
long length = 0;
String containerKey;
ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) {
// check index as sanity check
Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex());
String containerName = ksmKeyLocationInfo.getContainerName();
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
boolean success = false;
containerKey = ksmKeyLocationInfo.getBlockID();
try {
LOG.debug("get key accessing {} {}",
xceiverClient.getPipeline().getContainerName(), containerKey);
ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
.containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
.getKey(xceiverClient, containerKeyData, requestId);
List<ContainerProtos.ChunkInfo> chunks =
response.getKeyData().getChunksList();
for (ContainerProtos.ChunkInfo chunk : chunks) {
length += chunk.getLen();
}
success = true;
ChunkInputStream inputStream = new ChunkInputStream(
containerKey, xceiverClientManager, xceiverClient,
chunks, requestId);
groupInputStream.addStream(inputStream,
ksmKeyLocationInfo.getLength());
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient);
}
}
}
return new LengthInputStream(groupInputStream, length);
}
}

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.web.storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
*
* Note that this may write to multiple containers in one write call. In case
* that first container succeeded but later ones failed, the succeeded writes
* are not rolled back.
*
* TODO : currently not support multi-thread access.
*/
public class ChunkGroupOutputStream extends OutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(ChunkGroupOutputStream.class);
// array list's get(index) is O(1)
private final ArrayList<ChunkOutputStreamEntry> streamEntries;
private int currentStreamIndex;
private long totalSize;
private long byteOffset;
public ChunkGroupOutputStream() {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.totalSize = 0;
this.byteOffset = 0;
}
@VisibleForTesting
public long getByteOffset() {
return byteOffset;
}
/**
* Append another stream to the end of the list. Note that the streams are not
* actually created to this point, only enough meta data about the stream is
* stored. When something is to be actually written to the stream, the stream
* will be created (if not already).
*
* @param containerKey the key to store in the container
* @param key the ozone key
* @param xceiverClientManager xceiver manager instance
* @param xceiverClient xceiver manager instance
* @param requestID the request id
* @param chunkSize the chunk size for this key chunks
* @param length the total length of this key
*/
public synchronized void addStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String requestID, int chunkSize, long length) {
streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
xceiverClientManager, xceiverClient, requestID, chunkSize, length));
totalSize += length;
}
@VisibleForTesting
public synchronized void addStream(OutputStream outputStream, long length) {
streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
totalSize += length;
}
@Override
public synchronized void write(int b) throws IOException {
if (streamEntries.size() <= currentStreamIndex) {
throw new IndexOutOfBoundsException();
}
ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
entry.write(b);
if (entry.getRemaining() <= 0) {
currentStreamIndex += 1;
}
byteOffset += 1;
}
/**
* Try to write the bytes sequence b[off:off+len) to streams.
*
* NOTE: Throws exception if the data could not fit into the remaining space.
* In which case nothing will be written.
* TODO:May need to revisit this behaviour.
*
* @param b byte data
* @param off starting offset
* @param len length to write
* @throws IOException
*/
@Override
public synchronized void write(byte[] b, int off, int len)
throws IOException {
if (b == null) {
throw new NullPointerException();
}
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (len == 0) {
return;
}
if (streamEntries.size() <= currentStreamIndex) {
throw new IOException("Write out of stream range! stream index:" +
currentStreamIndex);
}
if (totalSize - byteOffset < len) {
throw new IOException("Can not write " + len + " bytes with only " +
(totalSize - byteOffset) + " byte space");
}
while (len > 0) {
// in theory, this condition should never violate due the check above
// still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
int writeLen = Math.min(len, (int)current.getRemaining());
current.write(b, off, writeLen);
if (current.getRemaining() <= 0) {
currentStreamIndex += 1;
}
len -= writeLen;
off += writeLen;
byteOffset += writeLen;
}
}
@Override
public synchronized void flush() throws IOException {
for (int i = 0; i <= currentStreamIndex; i++) {
streamEntries.get(i).flush();
}
}
@Override
public synchronized void close() throws IOException {
for (ChunkOutputStreamEntry entry : streamEntries) {
entry.close();
}
}
private static class ChunkOutputStreamEntry extends OutputStream {
private OutputStream outputStream;
private final String containerKey;
private final String key;
private final XceiverClientManager xceiverClientManager;
private final XceiverClientSpi xceiverClient;
private final String requestId;
private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
ChunkOutputStreamEntry(String containerKey, String key,
XceiverClientManager xceiverClientManager,
XceiverClientSpi xceiverClient, String requestId, int chunkSize,
long length) {
this.outputStream = null;
this.containerKey = containerKey;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient;
this.requestId = requestId;
this.chunkSize = chunkSize;
this.length = length;
this.currentPosition = 0;
}
/**
* For testing purpose, taking a some random created stream instance.
* @param outputStream a existing writable output stream
* @param length the length of data to write to the stream
*/
ChunkOutputStreamEntry(OutputStream outputStream, long length) {
this.outputStream = outputStream;
this.containerKey = null;
this.key = null;
this.xceiverClientManager = null;
this.xceiverClient = null;
this.requestId = null;
this.chunkSize = -1;
this.length = length;
this.currentPosition = 0;
}
long getLength() {
return length;
}
long getRemaining() {
return length - currentPosition;
}
private synchronized void checkStream() {
if (this.outputStream == null) {
this.outputStream = new ChunkOutputStream(containerKey,
key, xceiverClientManager, xceiverClient,
requestId, chunkSize);
}
}
@Override
public void write(int b) throws IOException {
checkStream();
outputStream.write(b);
this.currentPosition += 1;
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkStream();
outputStream.write(b, off, len);
this.currentPosition += len;
}
@Override
public void flush() throws IOException {
if (this.outputStream != null) {
this.outputStream.flush();
}
}
@Override
public void close() throws IOException {
if (this.outputStream != null) {
this.outputStream.close();
}
}
}
public static ChunkGroupOutputStream getFromKsmKeyInfo(
KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient,
int chunkSize, String requestId) throws IOException {
// TODO: the following createContainer and key writes may fail, in which
// case we should revert the above allocateKey to KSM.
// check index as sanity check
int index = 0;
String containerKey;
ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
containerKey = subKeyInfo.getBlockID();
Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
String containerName = subKeyInfo.getContainerName();
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);
XceiverClientSpi xceiverClient =
xceiverClientManager.acquireClient(pipeline);
// create container if needed
// TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
// always true.
boolean shouldCreate = true;
if (shouldCreate) {
try {
ContainerProtocolCalls.createContainer(xceiverClient, requestId);
} catch (StorageContainerException sce) {
LOG.warn("Create container failed with {}", containerName, sce);
}
}
groupOutputStream.addStream(containerKey, keyInfo.getKeyName(),
xceiverClientManager, xceiverClient, requestId, chunkSize,
subKeyInfo.getLength());
}
return groupOutputStream;
}
}

View File

@ -19,12 +19,6 @@
package org.apache.hadoop.ozone.web.storage;
import com.google.common.base.Strings;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.GetKeyResponseProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto
.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.server.datanode.fsdataset
.LengthInputStream;
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
@ -37,12 +31,12 @@ import org.apache.hadoop.ksm.protocolPB
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.io.OzoneOutputStream;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.protocolPB
@ -61,21 +55,12 @@ import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
import org.apache.hadoop.scm.XceiverClientSpi;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
import java.util.Locale;
import java.util.List;
/**
@ -104,9 +89,9 @@ public final class DistributedStorageHandler implements StorageHandler {
*/
public DistributedStorageHandler(OzoneConfiguration conf,
StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocation,
storageContainerLocation,
KeySpaceManagerProtocolClientSideTranslatorPB
keySpaceManagerClient) {
keySpaceManagerClient) {
this.keySpaceManagerClient = keySpaceManagerClient;
this.storageContainerLocationClient = storageContainerLocation;
this.xceiverClientManager = new XceiverClientManager(conf);
@ -119,8 +104,8 @@ public final class DistributedStorageHandler implements StorageHandler {
KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
LOG.warn("The chunk size ({}) is not allowed to be more than"
+ " the maximum size ({}),"
+ " resetting to the maximum size.",
+ " the maximum size ({}),"
+ " resetting to the maximum size.",
chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
}
@ -159,7 +144,7 @@ public final class DistributedStorageHandler implements StorageHandler {
public void setVolumeQuota(VolumeArgs args, boolean remove)
throws IOException, OzoneException {
long quota = remove ? OzoneConsts.MAX_QUOTA_IN_BYTES :
args.getQuota().sizeInBytes();
args.getQuota().sizeInBytes();
keySpaceManagerClient.setQuota(args.getVolumeName(), quota);
}
@ -397,22 +382,11 @@ public final class DistributedStorageHandler implements StorageHandler {
.setDataSize(args.getSize())
.build();
// contact KSM to allocate a block for key.
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName());
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
// TODO the following createContainer and key writes may fail, in which
// case we should revert the above allocateKey to KSM.
String containerName = keyInfo.getContainerName();
XceiverClientSpi xceiverClient = getContainer(containerName);
if (keyInfo.getShouldCreateContainer()) {
LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
args.getVolumeName(), args.getBucketName(), args.getKeyName());
ContainerProtocolCalls.createContainer(
xceiverClient, args.getRequestID());
}
// establish a connection to the container to write the key
return new ChunkOutputStream(containerKey, args.getKeyName(),
xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize);
ChunkGroupOutputStream groupOutputStream =
ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
storageContainerLocationClient, chunkSize, args.getRequestID());
return new OzoneOutputStream(groupOutputStream);
}
@Override
@ -431,33 +405,9 @@ public final class DistributedStorageHandler implements StorageHandler {
.setDataSize(args.getSize())
.build();
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
String containerKey = buildContainerKey(args.getVolumeName(),
args.getBucketName(), args.getKeyName());
String containerName = keyInfo.getContainerName();
XceiverClientSpi xceiverClient = getContainer(containerName);
boolean success = false;
try {
LOG.debug("get key accessing {} {}",
xceiverClient.getPipeline().getContainerName(), containerKey);
KeyData containerKeyData = OzoneContainerTranslation
.containerKeyDataForRead(
xceiverClient.getPipeline().getContainerName(), containerKey);
GetKeyResponseProto response = ContainerProtocolCalls
.getKey(xceiverClient, containerKeyData, args.getRequestID());
long length = 0;
List<ChunkInfo> chunks = response.getKeyData().getChunksList();
for (ChunkInfo chunk : chunks) {
length += chunk.getLen();
}
success = true;
return new LengthInputStream(new ChunkInputStream(
containerKey, xceiverClientManager, xceiverClient,
chunks, args.getRequestID()), length);
} finally {
if (!success) {
xceiverClientManager.releaseClient(xceiverClient);
}
}
return ChunkGroupInputStream.getFromKsmKeyInfo(
keyInfo, xceiverClientManager, storageContainerLocationClient,
args.getRequestID());
}
@Override
@ -535,37 +485,6 @@ public final class DistributedStorageHandler implements StorageHandler {
}
}
private XceiverClientSpi getContainer(String containerName)
throws IOException {
Pipeline pipeline =
storageContainerLocationClient.getContainer(containerName);
return xceiverClientManager.acquireClient(pipeline);
}
/**
* Creates a container key from any number of components by combining all
* components with a delimiter.
*
* @param parts container key components
* @return container key
*/
private static String buildContainerKey(String... parts) {
return '/' + StringUtils.join('/', parts);
}
/**
* Formats a date in the expected string format.
*
* @param date the date to format
* @return formatted string representation of date
*/
private static String dateToString(Date date) {
SimpleDateFormat sdf =
new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
return sdf.format(date);
}
/**
* Closes DistributedStorageHandler.
*/

View File

@ -813,4 +813,12 @@
Port used by cblock to connect to SCM.
</description>
</property>
<property>
<name>ozone.scm.block.size</name>
<value>268435456</value>
<description>
The default size of a scm block in bytes.
</description>
</property>
</configuration>

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
/**
* This class tests ChunkGroupInputStream and ChunkGroupOutStream.
*/
public class TestChunkStreams {
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* This test uses ByteArrayOutputStream as the underlying stream to test
* the correctness of ChunkGroupOutputStream.
*
* @throws Exception
*/
@Test
public void testWriteGroupOutputStream() throws Exception {
try (ChunkGroupOutputStream groupOutputStream =
new ChunkGroupOutputStream()) {
ArrayList<OutputStream> outputStreams = new ArrayList<>();
// 5 byte streams, each 100 bytes. write 500 bytes means writing to each
// of them with 100 bytes.
for (int i = 0; i < 5; i++) {
ByteArrayOutputStream out = new ByteArrayOutputStream(100);
outputStreams.add(out);
groupOutputStream.addStream(out, 100);
}
assertEquals(0, groupOutputStream.getByteOffset());
String dataString = RandomStringUtils.randomAscii(500);
byte[] data = dataString.getBytes();
groupOutputStream.write(data, 0, data.length);
assertEquals(500, groupOutputStream.getByteOffset());
String res = "";
int offset = 0;
for (OutputStream stream : outputStreams) {
String subString = stream.toString();
res += subString;
assertEquals(dataString.substring(offset, offset + 100), subString);
offset += 100;
}
assertEquals(dataString, res);
}
}
@Test
public void testErrorWriteGroupOutputStream() throws Exception {
try (ChunkGroupOutputStream groupOutputStream =
new ChunkGroupOutputStream()) {
ArrayList<OutputStream> outputStreams = new ArrayList<>();
// 5 byte streams, each 100 bytes. write 500 bytes means writing to each
// of them with 100 bytes. all 5 streams makes up a ChunkGroupOutputStream
// with a total of 500 bytes in size
for (int i = 0; i < 5; i++) {
ByteArrayOutputStream out = new ByteArrayOutputStream(100);
outputStreams.add(out);
groupOutputStream.addStream(out, 100);
}
assertEquals(0, groupOutputStream.getByteOffset());
// first writes of 100 bytes should succeed
groupOutputStream.write(RandomStringUtils.randomAscii(100).getBytes());
assertEquals(100, groupOutputStream.getByteOffset());
// second writes of 500 bytes should fail, as there should be only 400
// bytes space left
// TODO : if we decide to take the 400 bytes instead in the future,
// other add more informative error code rather than exception, need to
// change this part.
exception.expect(IOException.class);
exception.expectMessage(
"Can not write 500 bytes with only 400 byte space");
groupOutputStream.write(RandomStringUtils.randomAscii(500).getBytes());
assertEquals(100, groupOutputStream.getByteOffset());
}
}
@Test
public void testReadGroupInputStream() throws Exception {
try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
ArrayList<InputStream> inputStreams = new ArrayList<>();
String dataString = RandomStringUtils.randomAscii(500);
byte[] buf = dataString.getBytes();
int offset = 0;
for (int i = 0; i < 5; i++) {
ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
inputStreams.add(in);
offset += 100;
groupInputStream.addStream(in, 100);
}
byte[] resBuf = new byte[500];
int len = groupInputStream.read(resBuf, 0, 500);
assertEquals(500, len);
assertEquals(dataString, new String(resBuf));
}
}
@Test
public void testErrorReadGroupInputStream() throws Exception {
try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
ArrayList<InputStream> inputStreams = new ArrayList<>();
String dataString = RandomStringUtils.randomAscii(500);
byte[] buf = dataString.getBytes();
int offset = 0;
for (int i = 0; i < 5; i++) {
ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
inputStreams.add(in);
offset += 100;
groupInputStream.addStream(in, 100);
}
byte[] resBuf = new byte[600];
// read 300 bytes first
int len = groupInputStream.read(resBuf, 0, 340);
assertEquals(3, groupInputStream.getCurrentStreamIndex());
assertEquals(60, groupInputStream.getRemainingOfIndex(3));
assertEquals(340, len);
assertEquals(dataString.substring(0, 340),
new String(resBuf).substring(0, 340));
// read following 300 bytes, but only 200 left
len = groupInputStream.read(resBuf, 340, 260);
assertEquals(5, groupInputStream.getCurrentStreamIndex());
assertEquals(0, groupInputStream.getRemainingOfIndex(4));
assertEquals(160, len);
assertEquals(dataString, new String(resBuf).substring(0, 500));
// further read should get EOF
len = groupInputStream.read(resBuf, 0, 1);
// reached EOF, further read should get -1
assertEquals(-1, len);
}
}
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.ksm;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.LinkedList;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
/**
* Test key write/read where a key can span multiple containers.
*/
public class TestMultipleContainerReadWrite {
private static MiniOzoneCluster cluster = null;
private static StorageHandler storageHandler;
private static UserArgs userArgs;
private static OzoneConfiguration conf;
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true and
* OZONE_HANDLER_TYPE_KEY = "distributed"
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
// set to as small as 100 bytes per block.
conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY, 100);
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
cluster = new MiniOzoneCluster.Builder(conf)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testWriteRead() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
String dataString = RandomStringUtils.randomAscii(500);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
keyArgs.setSize(500);
try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
outputStream.write(dataString.getBytes());
}
byte[] data = new byte[dataString.length()];
try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
inputStream.read(data, 0, data.length);
}
assertEquals(dataString, new String(data));
// checking whether container meta data has the chunk file persisted.
MetricsRecordBuilder containerMetrics = getMetrics(
"StorageContainerMetrics");
assertCounter("numWriteChunk", 5L, containerMetrics);
assertCounter("numReadChunk", 5L, containerMetrics);
}
@Test
public void testErrorWrite() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
String dataString1 = RandomStringUtils.randomAscii(100);
String dataString2 = RandomStringUtils.randomAscii(500);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
keyArgs.setSize(500);
try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
// first write will write succeed
outputStream.write(dataString1.getBytes());
// second write
exception.expect(IOException.class);
exception.expectMessage(
"Can not write 500 bytes with only 400 byte space");
outputStream.write(dataString2.getBytes());
}
}
@Test
public void testPartialRead() throws Exception {
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
String keyName = "key" + RandomStringUtils.randomNumeric(5);
VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
String dataString = RandomStringUtils.randomAscii(500);
KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
keyArgs.setSize(500);
try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
outputStream.write(dataString.getBytes());
}
byte[] data = new byte[600];
try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
int readLen = inputStream.read(data, 0, 340);
assertEquals(340, readLen);
assertEquals(dataString.substring(0, 340),
new String(data).substring(0, 340));
readLen = inputStream.read(data, 340, 260);
assertEquals(160, readLen);
assertEquals(dataString, new String(data).substring(0, 500));
readLen = inputStream.read(data, 500, 1);
assertEquals(-1, readLen);
}
}
}