HDDS-169:Add Volume IO Stats. Contributed by Bharat Viswanadham
This commit is contained in:
parent
84ac6bb1b1
commit
23ebe0f4f0
|
@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
|||
ContainerType;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||
ContainerLifeCycleState;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
@ -59,6 +60,8 @@ public class ContainerData {
|
|||
private final AtomicLong writeCount;
|
||||
private final AtomicLong bytesUsed;
|
||||
|
||||
private HddsVolume volume;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a ContainerData Object, which holds metadata of the container.
|
||||
|
@ -289,5 +292,22 @@ public class ContainerData {
|
|||
return this.bytesUsed.addAndGet(-1L * reclaimed);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the Volume for the Container.
|
||||
* This should be called only from the createContainer.
|
||||
* @param hddsVolume
|
||||
*/
|
||||
public void setVolume(HddsVolume hddsVolume) {
|
||||
this.volume = hddsVolume;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the volume of the Container.
|
||||
* @return HddsVolume
|
||||
*/
|
||||
public HddsVolume getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ public final class HddsVolume {
|
|||
private final File hddsRootDir;
|
||||
private final VolumeInfo volumeInfo;
|
||||
private VolumeState state;
|
||||
private final VolumeIOStats volumeIOStats;
|
||||
|
||||
// VERSION file properties
|
||||
private String storageID; // id of the file system
|
||||
|
@ -117,6 +118,7 @@ public final class HddsVolume {
|
|||
this.state = VolumeState.NOT_INITIALIZED;
|
||||
this.clusterID = b.clusterID;
|
||||
this.datanodeUuid = b.datanodeUuid;
|
||||
this.volumeIOStats = new VolumeIOStats();
|
||||
|
||||
VolumeInfo.Builder volumeBuilder =
|
||||
new VolumeInfo.Builder(b.volumeRootStr, b.conf)
|
||||
|
@ -303,6 +305,10 @@ public final class HddsVolume {
|
|||
return (state == VolumeState.FAILED);
|
||||
}
|
||||
|
||||
public VolumeIOStats getVolumeIOStats() {
|
||||
return volumeIOStats;
|
||||
}
|
||||
|
||||
public void failVolume() {
|
||||
setState(VolumeState.FAILED);
|
||||
volumeInfo.shutdownUsageThread();
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.container.common.volume;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* This class is used to track Volume IO stats for each HDDS Volume.
|
||||
*/
|
||||
public class VolumeIOStats {
|
||||
|
||||
private final AtomicLong readBytes;
|
||||
private final AtomicLong readOpCount;
|
||||
private final AtomicLong writeBytes;
|
||||
private final AtomicLong writeOpCount;
|
||||
private final AtomicLong readTime;
|
||||
private final AtomicLong writeTime;
|
||||
|
||||
public VolumeIOStats() {
|
||||
readBytes = new AtomicLong(0);
|
||||
readOpCount = new AtomicLong(0);
|
||||
writeBytes = new AtomicLong(0);
|
||||
writeOpCount = new AtomicLong(0);
|
||||
readTime = new AtomicLong(0);
|
||||
writeTime = new AtomicLong(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment number of bytes read from the volume.
|
||||
* @param bytesRead
|
||||
*/
|
||||
public void incReadBytes(long bytesRead) {
|
||||
readBytes.addAndGet(bytesRead);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the read operations performed on the volume.
|
||||
*/
|
||||
public void incReadOpCount() {
|
||||
readOpCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment number of bytes written on to the volume.
|
||||
* @param bytesWritten
|
||||
*/
|
||||
public void incWriteBytes(long bytesWritten) {
|
||||
writeBytes.addAndGet(bytesWritten);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the write operations performed on the volume.
|
||||
*/
|
||||
public void incWriteOpCount() {
|
||||
writeOpCount.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the time taken by read operation on the volume.
|
||||
* @param time
|
||||
*/
|
||||
public void incReadTime(long time) {
|
||||
readTime.addAndGet(time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Increment the time taken by write operation on the volume.
|
||||
* @param time
|
||||
*/
|
||||
public void incWriteTime(long time) {
|
||||
writeTime.addAndGet(time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total number of bytes read from the volume.
|
||||
* @return long
|
||||
*/
|
||||
public long getReadBytes() {
|
||||
return readBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total number of bytes written to the volume.
|
||||
* @return long
|
||||
*/
|
||||
public long getWriteBytes() {
|
||||
return writeBytes.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total number of read operations performed on the volume.
|
||||
* @return long
|
||||
*/
|
||||
public long getReadOpCount() {
|
||||
return readOpCount.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total number of write operations performed on the volume.
|
||||
* @return long
|
||||
*/
|
||||
public long getWriteOpCount() {
|
||||
return writeOpCount.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total read operations time on the volume.
|
||||
* @return long
|
||||
*/
|
||||
public long getReadTime() {
|
||||
return readTime.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns total write operations time on the volume.
|
||||
* @return long
|
||||
*/
|
||||
public long getWriteTime() {
|
||||
return writeTime.get();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -145,6 +145,7 @@ public class KeyValueContainer implements Container {
|
|||
containerData.setChunksPath(chunksPath.getPath());
|
||||
containerData.setContainerDBType(impl);
|
||||
containerData.setDbFile(dbFile);
|
||||
containerData.setVolume(containerVolume);
|
||||
|
||||
// Create .container file and .chksm file
|
||||
createContainerFile(containerFile, containerCheckSumFile);
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
|||
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -67,10 +69,11 @@ public final class ChunkUtils {
|
|||
* @param chunkFile - File to write data to.
|
||||
* @param chunkInfo - Data stream to write.
|
||||
* @param data - The data buffer.
|
||||
* @param volumeIOStats
|
||||
* @throws StorageContainerException
|
||||
*/
|
||||
public static void writeData(File chunkFile, ChunkInfo chunkInfo,
|
||||
byte[] data) throws
|
||||
byte[] data, VolumeIOStats volumeIOStats) throws
|
||||
StorageContainerException, ExecutionException, InterruptedException,
|
||||
NoSuchAlgorithmException {
|
||||
|
||||
|
@ -87,6 +90,12 @@ public final class ChunkUtils {
|
|||
FileLock lock = null;
|
||||
|
||||
try {
|
||||
if (chunkInfo.getChecksum() != null &&
|
||||
!chunkInfo.getChecksum().isEmpty()) {
|
||||
verifyChecksum(chunkInfo, data, log);
|
||||
}
|
||||
|
||||
long writeTimeStart = Time.monotonicNow();
|
||||
file =
|
||||
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||
StandardOpenOption.CREATE,
|
||||
|
@ -94,11 +103,11 @@ public final class ChunkUtils {
|
|||
StandardOpenOption.SPARSE,
|
||||
StandardOpenOption.SYNC);
|
||||
lock = file.lock().get();
|
||||
if (chunkInfo.getChecksum() != null &&
|
||||
!chunkInfo.getChecksum().isEmpty()) {
|
||||
verifyChecksum(chunkInfo, data, log);
|
||||
}
|
||||
int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
|
||||
// Increment volumeIO stats here.
|
||||
volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
|
||||
volumeIOStats.incWriteOpCount();
|
||||
volumeIOStats.incWriteBytes(size);
|
||||
if (size != data.length) {
|
||||
log.error("Invalid write size found. Size:{} Expected: {} ", size,
|
||||
data.length);
|
||||
|
@ -136,12 +145,15 @@ public final class ChunkUtils {
|
|||
*
|
||||
* @param chunkFile - file where data lives.
|
||||
* @param data - chunk definition.
|
||||
* @param volumeIOStats
|
||||
* @return ByteBuffer
|
||||
* @throws StorageContainerException
|
||||
* @throws ExecutionException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
|
||||
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
|
||||
VolumeIOStats volumeIOStats)
|
||||
throws
|
||||
StorageContainerException, ExecutionException, InterruptedException,
|
||||
NoSuchAlgorithmException {
|
||||
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||
|
@ -157,6 +169,7 @@ public final class ChunkUtils {
|
|||
AsynchronousFileChannel file = null;
|
||||
FileLock lock = null;
|
||||
try {
|
||||
long readStartTime = Time.monotonicNow();
|
||||
file =
|
||||
AsynchronousFileChannel.open(chunkFile.toPath(),
|
||||
StandardOpenOption.READ);
|
||||
|
@ -165,10 +178,13 @@ public final class ChunkUtils {
|
|||
ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
|
||||
file.read(buf, data.getOffset()).get();
|
||||
|
||||
// Increment volumeIO stats here.
|
||||
volumeIOStats.incReadTime(Time.monotonicNow() - readStartTime);
|
||||
volumeIOStats.incReadOpCount();
|
||||
volumeIOStats.incReadBytes(data.getLen());
|
||||
if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
|
||||
verifyChecksum(data, buf.array(), log);
|
||||
}
|
||||
|
||||
return buf;
|
||||
} catch (IOException e) {
|
||||
throw new StorageContainerException(e, IO_EXCEPTION);
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep
|
|||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
|
||||
|
@ -64,13 +66,15 @@ public class ChunkManagerImpl implements ChunkManager {
|
|||
* @throws StorageContainerException
|
||||
*/
|
||||
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
|
||||
byte[] data, ContainerProtos.Stage stage)
|
||||
byte[] data, ContainerProtos.Stage stage)
|
||||
throws StorageContainerException {
|
||||
|
||||
try {
|
||||
|
||||
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||
.getContainerData();
|
||||
HddsVolume volume = containerData.getVolume();
|
||||
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
|
||||
|
||||
File chunkFile = ChunkUtils.validateChunk(containerData, info);
|
||||
File tmpChunkFile = getTmpChunkFile(chunkFile, info);
|
||||
|
@ -81,20 +85,23 @@ public class ChunkManagerImpl implements ChunkManager {
|
|||
switch (stage) {
|
||||
case WRITE_DATA:
|
||||
// Initially writes to temporary chunk file.
|
||||
ChunkUtils.writeData(tmpChunkFile, info, data);
|
||||
ChunkUtils.writeData(tmpChunkFile, info, data, volumeIOStats);
|
||||
// No need to increment container stats here, as still data is not
|
||||
// committed here.
|
||||
break;
|
||||
case COMMIT_DATA:
|
||||
// commit the data, means move chunk data from temporary chunk file
|
||||
// to actual chunk file.
|
||||
long sizeDiff = tmpChunkFile.length() - chunkFile.length();
|
||||
commitChunk(tmpChunkFile, chunkFile);
|
||||
containerData.incrBytesUsed(sizeDiff);
|
||||
// Increment container stats here, as we commit the data.
|
||||
containerData.incrBytesUsed(info.getLen());
|
||||
containerData.incrWriteCount();
|
||||
containerData.incrWriteBytes(sizeDiff);
|
||||
containerData.incrWriteBytes(info.getLen());
|
||||
break;
|
||||
case COMBINED:
|
||||
// directly write to the chunk file
|
||||
ChunkUtils.writeData(chunkFile, info, data);
|
||||
ChunkUtils.writeData(chunkFile, info, data, volumeIOStats);
|
||||
// Increment container stats here, as we directly write to chunk file.
|
||||
containerData.incrBytesUsed(info.getLen());
|
||||
containerData.incrWriteCount();
|
||||
containerData.incrWriteBytes(info.getLen());
|
||||
|
@ -137,6 +144,8 @@ public class ChunkManagerImpl implements ChunkManager {
|
|||
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||
.getContainerData();
|
||||
ByteBuffer data;
|
||||
HddsVolume volume = containerData.getVolume();
|
||||
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
|
||||
|
||||
// Checking here, which layout version the container is, and reading
|
||||
// the chunk file in that format.
|
||||
|
@ -145,9 +154,10 @@ public class ChunkManagerImpl implements ChunkManager {
|
|||
if (containerData.getLayOutVersion() == ChunkLayOutVersion
|
||||
.getLatestVersion().getVersion()) {
|
||||
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
|
||||
data = ChunkUtils.readData(chunkFile, info);
|
||||
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
|
||||
containerData.incrReadCount();
|
||||
containerData.incrReadBytes(chunkFile.length());
|
||||
long length = chunkFile.length();
|
||||
containerData.incrReadBytes(length);
|
||||
return data.array();
|
||||
}
|
||||
} catch(NoSuchAlgorithmException ex) {
|
||||
|
|
|
@ -22,10 +22,12 @@ import org.apache.hadoop.hdds.client.BlockID;
|
|||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
@ -37,13 +39,9 @@ import org.mockito.Mockito;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.ArgumentMatchers.anyList;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -57,21 +55,21 @@ public class TestChunkManagerImpl {
|
|||
private String scmId = UUID.randomUUID().toString();
|
||||
private VolumeSet volumeSet;
|
||||
private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
|
||||
private HddsVolume hddsVolume;
|
||||
private KeyValueContainerData keyValueContainerData;
|
||||
private KeyValueContainer keyValueContainer;
|
||||
private KeyData keyData;
|
||||
private BlockID blockID;
|
||||
private ChunkManagerImpl chunkManager;
|
||||
private ChunkInfo chunkInfo;
|
||||
private byte[] data;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
config = new OzoneConfiguration();
|
||||
|
||||
HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
|
||||
hddsVolume = new HddsVolume.Builder(folder.getRoot()
|
||||
.getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
|
||||
.toString()).build();
|
||||
|
||||
|
@ -83,22 +81,15 @@ public class TestChunkManagerImpl {
|
|||
|
||||
keyValueContainerData = new KeyValueContainerData(1L);
|
||||
|
||||
keyValueContainer = new KeyValueContainer(
|
||||
keyValueContainerData, config);
|
||||
keyValueContainer = new KeyValueContainer(keyValueContainerData, config);
|
||||
|
||||
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
|
||||
|
||||
data = "testing write chunks".getBytes();
|
||||
// Creating KeyData
|
||||
blockID = new BlockID(1L, 1L);
|
||||
keyData = new KeyData(blockID);
|
||||
keyData.addMetadata("VOLUME", "ozone");
|
||||
keyData.addMetadata("OWNER", "hdfs");
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||
.getLocalID(), 0), 0, data.length);
|
||||
chunkList.add(chunkInfo.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
|
||||
// Create a ChunkManager object.
|
||||
chunkManager = new ChunkManagerImpl();
|
||||
|
@ -113,16 +104,38 @@ public class TestChunkManagerImpl {
|
|||
assertTrue(chunksPath.exists());
|
||||
// Initially chunks folder should be empty.
|
||||
assertTrue(chunksPath.listFiles().length == 0);
|
||||
|
||||
// As no chunks are written to the volume writeBytes should be 0
|
||||
checkWriteIOStats(0, 0);
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.WRITE_DATA);
|
||||
// Now a chunk file is being written with Stage WRITE_DATA, so it should
|
||||
// create a temporary chunk file.
|
||||
assertTrue(chunksPath.listFiles().length == 1);
|
||||
|
||||
File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
|
||||
File tempChunkFile = new File(chunkFile.getParent(),
|
||||
chunkFile.getName() +
|
||||
OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
|
||||
OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
|
||||
|
||||
// As chunk write stage is WRITE_DATA, temp chunk file will be created.
|
||||
assertTrue(tempChunkFile.exists());
|
||||
|
||||
checkWriteIOStats(data.length, 1);
|
||||
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.COMMIT_DATA);
|
||||
|
||||
checkWriteIOStats(data.length, 1);
|
||||
|
||||
// Old temp file should have been renamed to chunk file.
|
||||
assertTrue(chunksPath.listFiles().length == 1);
|
||||
|
||||
// As commit happened, chunk file should exist.
|
||||
assertTrue(chunkFile.exists());
|
||||
assertFalse(tempChunkFile.exists());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -131,13 +144,12 @@ public class TestChunkManagerImpl {
|
|||
long randomLength = 200L;
|
||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||
.getLocalID(), 0), 0, randomLength);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(chunkInfo.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.WRITE_DATA);
|
||||
fail("testWriteChunkIncorrectLength failed");
|
||||
} catch (StorageContainerException ex) {
|
||||
// As we got an exception, writeBytes should be 0.
|
||||
checkWriteIOStats(0, 0);
|
||||
GenericTestUtils.assertExceptionContains("data array does not match " +
|
||||
"the length ", ex);
|
||||
assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult());
|
||||
|
@ -152,21 +164,29 @@ public class TestChunkManagerImpl {
|
|||
assertTrue(chunksPath.exists());
|
||||
// Initially chunks folder should be empty.
|
||||
assertTrue(chunksPath.listFiles().length == 0);
|
||||
checkWriteIOStats(0, 0);
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.COMBINED);
|
||||
// Now a chunk file is being written with Stage WRITE_DATA, so it should
|
||||
// create a temporary chunk file.
|
||||
// Now a chunk file is being written with Stage COMBINED_DATA, so it should
|
||||
// create a chunk file.
|
||||
assertTrue(chunksPath.listFiles().length == 1);
|
||||
File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
|
||||
assertTrue(chunkFile.exists());
|
||||
checkWriteIOStats(data.length, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadChunk() throws Exception {
|
||||
checkWriteIOStats(0, 0);
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.COMBINED);
|
||||
checkWriteIOStats(data.length, 1);
|
||||
checkReadIOStats(0, 0);
|
||||
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
|
||||
chunkInfo);
|
||||
assertEquals(expectedData.length, data.length);
|
||||
assertTrue(Arrays.equals(expectedData, data));
|
||||
checkReadIOStats(data.length, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -187,9 +207,6 @@ public class TestChunkManagerImpl {
|
|||
long randomLength = 200L;
|
||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||
.getLocalID(), 0), 0, randomLength);
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(chunkInfo.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
|
||||
fail("testDeleteChunkUnsupportedRequest");
|
||||
} catch (StorageContainerException ex) {
|
||||
|
@ -205,9 +222,6 @@ public class TestChunkManagerImpl {
|
|||
.getLocalID(), 0), 0, data.length);
|
||||
//Setting checksum to some value.
|
||||
chunkInfo.setChecksum("some garbage");
|
||||
List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
|
||||
chunkList.add(chunkInfo.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.COMBINED);
|
||||
fail("testWriteChunkChecksumMismatch failed");
|
||||
|
@ -231,5 +245,46 @@ public class TestChunkManagerImpl {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteAndReadChunkMultipleTimes() throws Exception {
|
||||
for (int i=0; i<100; i++) {
|
||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||
.getLocalID(), i), 0, data.length);
|
||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||
ContainerProtos.Stage.COMBINED);
|
||||
}
|
||||
checkWriteIOStats(data.length*100, 100);
|
||||
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
|
||||
|
||||
for (int i=0; i<100; i++) {
|
||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||
.getLocalID(), i), 0, data.length);
|
||||
chunkManager.readChunk(keyValueContainer, blockID, chunkInfo);
|
||||
}
|
||||
checkReadIOStats(data.length*100, 100);
|
||||
assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check WriteIO stats.
|
||||
* @param length
|
||||
* @param opCount
|
||||
*/
|
||||
private void checkWriteIOStats(long length, long opCount) {
|
||||
VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
|
||||
assertEquals(length, volumeIOStats.getWriteBytes());
|
||||
assertEquals(opCount, volumeIOStats.getWriteOpCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check ReadIO stats.
|
||||
* @param length
|
||||
* @param opCount
|
||||
*/
|
||||
private void checkReadIOStats(long length, long opCount) {
|
||||
VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
|
||||
assertEquals(length, volumeIOStats.getReadBytes());
|
||||
assertEquals(opCount, volumeIOStats.getReadOpCount());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ public class TestKeyManagerImpl {
|
|||
private KeyValueContainerData keyValueContainerData;
|
||||
private KeyValueContainer keyValueContainer;
|
||||
private KeyData keyData;
|
||||
private KeyManagerImpl keyValueContainerManager;
|
||||
private KeyManagerImpl keyManager;
|
||||
private BlockID blockID;
|
||||
|
||||
@Rule
|
||||
|
@ -98,17 +98,17 @@ public class TestKeyManagerImpl {
|
|||
keyData.setChunks(chunkList);
|
||||
|
||||
// Create KeyValueContainerManager
|
||||
keyValueContainerManager = new KeyManagerImpl(config);
|
||||
keyManager = new KeyManagerImpl(config);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPutAndGetKey() throws Exception {
|
||||
//Put Key
|
||||
keyValueContainerManager.putKey(keyValueContainer, keyData);
|
||||
keyManager.putKey(keyValueContainer, keyData);
|
||||
|
||||
//Get Key
|
||||
KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer,
|
||||
KeyData fromGetKeyData = keyManager.getKey(keyValueContainer,
|
||||
keyData.getBlockID());
|
||||
|
||||
assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
|
||||
|
@ -124,9 +124,15 @@ public class TestKeyManagerImpl {
|
|||
public void testDeleteKey() throws Exception {
|
||||
try {
|
||||
//Put Key
|
||||
keyValueContainerManager.putKey(keyValueContainer, keyData);
|
||||
keyManager.putKey(keyValueContainer, keyData);
|
||||
//Delete Key
|
||||
keyValueContainerManager.deleteKey(keyValueContainer, blockID);
|
||||
keyManager.deleteKey(keyValueContainer, blockID);
|
||||
try {
|
||||
keyManager.getKey(keyValueContainer, blockID);
|
||||
fail("testDeleteKey");
|
||||
} catch (StorageContainerException ex) {
|
||||
GenericTestUtils.assertExceptionContains("Unable to find the key", ex);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
fail("testDeleteKey failed");
|
||||
}
|
||||
|
@ -135,8 +141,8 @@ public class TestKeyManagerImpl {
|
|||
@Test
|
||||
public void testListKey() throws Exception {
|
||||
try {
|
||||
keyValueContainerManager.putKey(keyValueContainer, keyData);
|
||||
List<KeyData> listKeyData = keyValueContainerManager.listKey(
|
||||
keyManager.putKey(keyValueContainer, keyData);
|
||||
List<KeyData> listKeyData = keyManager.listKey(
|
||||
keyValueContainer, 1, 10);
|
||||
assertNotNull(listKeyData);
|
||||
assertTrue(listKeyData.size() == 1);
|
||||
|
@ -151,10 +157,10 @@ public class TestKeyManagerImpl {
|
|||
.getLocalID(), 0), 0, 1024);
|
||||
chunkList.add(info.getProtoBufMessage());
|
||||
keyData.setChunks(chunkList);
|
||||
keyValueContainerManager.putKey(keyValueContainer, keyData);
|
||||
keyManager.putKey(keyValueContainer, keyData);
|
||||
}
|
||||
|
||||
listKeyData = keyValueContainerManager.listKey(
|
||||
listKeyData = keyManager.listKey(
|
||||
keyValueContainer, 1, 10);
|
||||
assertNotNull(listKeyData);
|
||||
assertTrue(listKeyData.size() == 10);
|
||||
|
@ -167,7 +173,8 @@ public class TestKeyManagerImpl {
|
|||
@Test
|
||||
public void testGetNoSuchKey() throws Exception {
|
||||
try {
|
||||
keyValueContainerManager.getKey(keyValueContainer, new BlockID(1L, 2L));
|
||||
keyData = new KeyData(new BlockID(1L, 2L));
|
||||
keyManager.getKey(keyValueContainer, new BlockID(1L, 2L));
|
||||
fail("testGetNoSuchKey failed");
|
||||
} catch (StorageContainerException ex) {
|
||||
GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);
|
||||
|
|
Loading…
Reference in New Issue