HDDS-82. Merge ContainerData and ContainerStatus classes. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
0b4c44bdee
commit
5e88126776
|
@ -152,6 +152,7 @@ enum ContainerLifeCycleState {
|
||||||
OPEN = 1;
|
OPEN = 1;
|
||||||
CLOSING = 2;
|
CLOSING = 2;
|
||||||
CLOSED = 3;
|
CLOSED = 3;
|
||||||
|
INVALID = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ContainerCommandRequestProto {
|
message ContainerCommandRequestProto {
|
||||||
|
|
|
@ -52,6 +52,17 @@ public class ContainerData {
|
||||||
private ContainerType containerType;
|
private ContainerType containerType;
|
||||||
private String containerDBType;
|
private String containerDBType;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Number of pending deletion blocks in container.
|
||||||
|
*/
|
||||||
|
private int numPendingDeletionBlocks;
|
||||||
|
private AtomicLong readBytes;
|
||||||
|
private AtomicLong writeBytes;
|
||||||
|
private AtomicLong readCount;
|
||||||
|
private AtomicLong writeCount;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a ContainerData Object.
|
* Constructs a ContainerData Object.
|
||||||
*
|
*
|
||||||
|
@ -66,6 +77,34 @@ public class ContainerData {
|
||||||
this.bytesUsed = new AtomicLong(0L);
|
this.bytesUsed = new AtomicLong(0L);
|
||||||
this.containerID = containerID;
|
this.containerID = containerID;
|
||||||
this.state = ContainerLifeCycleState.OPEN;
|
this.state = ContainerLifeCycleState.OPEN;
|
||||||
|
this.numPendingDeletionBlocks = 0;
|
||||||
|
this.readCount = new AtomicLong(0L);
|
||||||
|
this.readBytes = new AtomicLong(0L);
|
||||||
|
this.writeCount = new AtomicLong(0L);
|
||||||
|
this.writeBytes = new AtomicLong(0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a ContainerData Object.
|
||||||
|
*
|
||||||
|
* @param containerID - ID
|
||||||
|
* @param conf - Configuration
|
||||||
|
* @param state - ContainerLifeCycleState
|
||||||
|
* @param
|
||||||
|
*/
|
||||||
|
public ContainerData(long containerID, Configuration conf,
|
||||||
|
ContainerLifeCycleState state) {
|
||||||
|
this.metadata = new TreeMap<>();
|
||||||
|
this.maxSize = conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
|
||||||
|
ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
|
||||||
|
this.bytesUsed = new AtomicLong(0L);
|
||||||
|
this.containerID = containerID;
|
||||||
|
this.state = state;
|
||||||
|
this.numPendingDeletionBlocks = 0;
|
||||||
|
this.readCount = new AtomicLong(0L);
|
||||||
|
this.readBytes = new AtomicLong(0L);
|
||||||
|
this.writeCount = new AtomicLong(0L);
|
||||||
|
this.writeBytes = new AtomicLong(0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -292,6 +331,14 @@ public class ContainerData {
|
||||||
return ContainerLifeCycleState.OPEN == state;
|
return ContainerLifeCycleState.OPEN == state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* checks if the container is invalid.
|
||||||
|
* @return - boolean
|
||||||
|
*/
|
||||||
|
public boolean isValid() {
|
||||||
|
return !(ContainerLifeCycleState.INVALID == state);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks this container as closed.
|
* Marks this container as closed.
|
||||||
*/
|
*/
|
||||||
|
@ -317,11 +364,119 @@ public class ContainerData {
|
||||||
this.bytesUsed.set(used);
|
this.bytesUsed.set(used);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long addBytesUsed(long delta) {
|
/**
|
||||||
return this.bytesUsed.addAndGet(delta);
|
* Get the number of bytes used by the container.
|
||||||
}
|
* @return the number of bytes used by the container.
|
||||||
|
*/
|
||||||
public long getBytesUsed() {
|
public long getBytesUsed() {
|
||||||
return bytesUsed.get();
|
return bytesUsed.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the number of bytes used by the container.
|
||||||
|
* @param used number of bytes used by the container.
|
||||||
|
* @return the current number of bytes used by the container afert increase.
|
||||||
|
*/
|
||||||
|
public long incrBytesUsed(long used) {
|
||||||
|
return this.bytesUsed.addAndGet(used);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrease the number of bytes used by the container.
|
||||||
|
* @param reclaimed the number of bytes reclaimed from the container.
|
||||||
|
* @return the current number of bytes used by the container after decrease.
|
||||||
|
*/
|
||||||
|
public long decrBytesUsed(long reclaimed) {
|
||||||
|
return this.bytesUsed.addAndGet(-1L * reclaimed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the count of pending deletion blocks.
|
||||||
|
*
|
||||||
|
* @param numBlocks increment number
|
||||||
|
*/
|
||||||
|
public void incrPendingDeletionBlocks(int numBlocks) {
|
||||||
|
this.numPendingDeletionBlocks += numBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrease the count of pending deletion blocks.
|
||||||
|
*
|
||||||
|
* @param numBlocks decrement number
|
||||||
|
*/
|
||||||
|
public void decrPendingDeletionBlocks(int numBlocks) {
|
||||||
|
this.numPendingDeletionBlocks -= numBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of pending deletion blocks.
|
||||||
|
*/
|
||||||
|
public int getNumPendingDeletionBlocks() {
|
||||||
|
return this.numPendingDeletionBlocks;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of bytes read from the container.
|
||||||
|
* @return the number of bytes read from the container.
|
||||||
|
*/
|
||||||
|
public long getReadBytes() {
|
||||||
|
return readBytes.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the number of bytes read from the container.
|
||||||
|
* @param bytes number of bytes read.
|
||||||
|
*/
|
||||||
|
public void incrReadBytes(long bytes) {
|
||||||
|
this.readBytes.addAndGet(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of times the container is read.
|
||||||
|
* @return the number of times the container is read.
|
||||||
|
*/
|
||||||
|
public long getReadCount() {
|
||||||
|
return readCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the number of container read count by 1.
|
||||||
|
*/
|
||||||
|
public void incrReadCount() {
|
||||||
|
this.readCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of bytes write into the container.
|
||||||
|
* @return the number of bytes write into the container.
|
||||||
|
*/
|
||||||
|
public long getWriteBytes() {
|
||||||
|
return writeBytes.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the number of bytes write into the container.
|
||||||
|
* @param bytes the number of bytes write into the container.
|
||||||
|
*/
|
||||||
|
public void incrWriteBytes(long bytes) {
|
||||||
|
this.writeBytes.addAndGet(bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of writes into the container.
|
||||||
|
* @return the number of writes into the container.
|
||||||
|
*/
|
||||||
|
public long getWriteCount() {
|
||||||
|
return writeCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increase the number of writes into the container by 1.
|
||||||
|
*/
|
||||||
|
public void incrWriteCount() {
|
||||||
|
this.writeCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,13 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
|
||||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
.ContainerLifeCycleState;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos;
|
.StorageContainerDatanodeProtocolProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
|
@ -39,8 +40,6 @@ import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
|
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
|
||||||
.StorageContainerDatanodeProtocolProtos.StorageTypeProto;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneConsts;
|
import org.apache.hadoop.ozone.OzoneConsts;
|
||||||
|
@ -116,7 +115,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
|
|
||||||
// TODO: consider primitive collection like eclipse-collections
|
// TODO: consider primitive collection like eclipse-collections
|
||||||
// to avoid autoboxing overhead
|
// to avoid autoboxing overhead
|
||||||
private final ConcurrentSkipListMap<Long, ContainerStatus>
|
private final ConcurrentSkipListMap<Long, ContainerData>
|
||||||
containerMap = new ConcurrentSkipListMap<>();
|
containerMap = new ConcurrentSkipListMap<>();
|
||||||
|
|
||||||
// Use a non-fair RW lock for better throughput, we may revisit this decision
|
// Use a non-fair RW lock for better throughput, we may revisit this decision
|
||||||
|
@ -246,19 +245,20 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
// when loading the info we get a null, this often means last time
|
// when loading the info we get a null, this often means last time
|
||||||
// SCM was ending up at some middle phase causing that the metadata
|
// SCM was ending up at some middle phase causing that the metadata
|
||||||
// was not populated. Such containers are marked as inactive.
|
// was not populated. Such containers are marked as inactive.
|
||||||
containerMap.put(containerID, new ContainerStatus(null));
|
ContainerData cData = new ContainerData(containerID, conf,
|
||||||
|
ContainerLifeCycleState.INVALID);
|
||||||
|
containerMap.put(containerID, cData);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
|
containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
|
||||||
|
|
||||||
ContainerStatus containerStatus = new ContainerStatus(containerData);
|
|
||||||
// Initialize pending deletion blocks count in in-memory
|
// Initialize pending deletion blocks count in in-memory
|
||||||
// container status.
|
// container status.
|
||||||
MetadataStore metadata = KeyUtils.getDB(containerData, conf);
|
MetadataStore metadata = KeyUtils.getDB(containerData, conf);
|
||||||
List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
|
List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
|
||||||
.getSequentialRangeKVs(null, Integer.MAX_VALUE,
|
.getSequentialRangeKVs(null, Integer.MAX_VALUE,
|
||||||
MetadataKeyFilters.getDeletingKeyFilter());
|
MetadataKeyFilters.getDeletingKeyFilter());
|
||||||
containerStatus.incrPendingDeletionBlocks(underDeletionBlocks.size());
|
containerData.incrPendingDeletionBlocks(underDeletionBlocks.size());
|
||||||
|
|
||||||
List<Map.Entry<byte[], byte[]>> liveKeys = metadata
|
List<Map.Entry<byte[], byte[]>> liveKeys = metadata
|
||||||
.getRangeKVs(null, Integer.MAX_VALUE,
|
.getRangeKVs(null, Integer.MAX_VALUE,
|
||||||
|
@ -277,9 +277,9 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
return 0L;
|
return 0L;
|
||||||
}
|
}
|
||||||
}).sum();
|
}).sum();
|
||||||
containerStatus.setBytesUsed(bytesUsed);
|
containerData.setBytesUsed(bytesUsed);
|
||||||
|
|
||||||
containerMap.put(containerID, containerStatus);
|
containerMap.put(containerID, containerData);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("read failed for file: {} ex: {}", containerName,
|
LOG.error("read failed for file: {} ex: {}", containerName,
|
||||||
ex.getMessage());
|
ex.getMessage());
|
||||||
|
@ -287,7 +287,9 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
// TODO : Add this file to a recovery Queue.
|
// TODO : Add this file to a recovery Queue.
|
||||||
|
|
||||||
// Remember that this container is busted and we cannot use it.
|
// Remember that this container is busted and we cannot use it.
|
||||||
containerMap.put(containerID, new ContainerStatus(null));
|
ContainerData cData = new ContainerData(containerID, conf,
|
||||||
|
ContainerLifeCycleState.INVALID);
|
||||||
|
containerMap.put(containerID, cData);
|
||||||
throw new StorageContainerException("Unable to read container info",
|
throw new StorageContainerException("Unable to read container info",
|
||||||
UNABLE_TO_READ_METADATA_DB);
|
UNABLE_TO_READ_METADATA_DB);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -456,18 +458,19 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
UNCLOSED_CONTAINER_IO);
|
UNCLOSED_CONTAINER_IO);
|
||||||
}
|
}
|
||||||
|
|
||||||
ContainerStatus status = containerMap.get(containerID);
|
ContainerData containerData = containerMap.get(containerID);
|
||||||
if (status == null) {
|
if (containerData == null) {
|
||||||
LOG.debug("No such container. ID: {}", containerID);
|
LOG.debug("No such container. ID: {}", containerID);
|
||||||
throw new StorageContainerException("No such container. ID : " +
|
throw new StorageContainerException("No such container. ID : " +
|
||||||
containerID, CONTAINER_NOT_FOUND);
|
containerID, CONTAINER_NOT_FOUND);
|
||||||
}
|
}
|
||||||
if (status.getContainer() == null) {
|
|
||||||
|
if(!containerData.isValid()) {
|
||||||
LOG.debug("Invalid container data. ID: {}", containerID);
|
LOG.debug("Invalid container data. ID: {}", containerID);
|
||||||
throw new StorageContainerException("Invalid container data. Name : " +
|
throw new StorageContainerException("Invalid container data. Name : " +
|
||||||
containerID, CONTAINER_NOT_FOUND);
|
containerID, CONTAINER_NOT_FOUND);
|
||||||
}
|
}
|
||||||
ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete);
|
ContainerUtils.removeContainer(containerData, conf, forceDelete);
|
||||||
containerMap.remove(containerID);
|
containerMap.remove(containerID);
|
||||||
} catch (StorageContainerException e) {
|
} catch (StorageContainerException e) {
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -509,7 +512,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
|
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
ConcurrentNavigableMap<Long, ContainerStatus> map;
|
ConcurrentNavigableMap<Long, ContainerData> map;
|
||||||
if (startContainerID == 0) {
|
if (startContainerID == 0) {
|
||||||
map = containerMap.tailMap(containerMap.firstKey(), true);
|
map = containerMap.tailMap(containerMap.firstKey(), true);
|
||||||
} else {
|
} else {
|
||||||
|
@ -517,9 +520,9 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
int currentCount = 0;
|
int currentCount = 0;
|
||||||
for (ContainerStatus entry : map.values()) {
|
for (ContainerData entry : map.values()) {
|
||||||
if (currentCount < count) {
|
if (currentCount < count) {
|
||||||
data.add(entry.getContainer());
|
data.add(entry);
|
||||||
currentCount++;
|
currentCount++;
|
||||||
} else {
|
} else {
|
||||||
return;
|
return;
|
||||||
|
@ -546,7 +549,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
throw new StorageContainerException("Unable to find the container. ID: "
|
throw new StorageContainerException("Unable to find the container. ID: "
|
||||||
+ containerID, CONTAINER_NOT_FOUND);
|
+ containerID, CONTAINER_NOT_FOUND);
|
||||||
}
|
}
|
||||||
ContainerData cData = containerMap.get(containerID).getContainer();
|
ContainerData cData = containerMap.get(containerID);
|
||||||
if (cData == null) {
|
if (cData == null) {
|
||||||
throw new StorageContainerException("Invalid container data. ID: "
|
throw new StorageContainerException("Invalid container data. ID: "
|
||||||
+ containerID, CONTAINER_INTERNAL_ERROR);
|
+ containerID, CONTAINER_INTERNAL_ERROR);
|
||||||
|
@ -584,8 +587,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
// I/O failure, this allows us to take quick action in case of container
|
// I/O failure, this allows us to take quick action in case of container
|
||||||
// issues.
|
// issues.
|
||||||
|
|
||||||
ContainerStatus status = new ContainerStatus(containerData);
|
containerMap.put(containerID, containerData);
|
||||||
containerMap.put(containerID, status);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -614,7 +616,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Path location = locationManager.getContainerPath();
|
Path location = locationManager.getContainerPath();
|
||||||
ContainerData orgData = containerMap.get(containerID).getContainer();
|
ContainerData orgData = containerMap.get(containerID);
|
||||||
if (orgData == null) {
|
if (orgData == null) {
|
||||||
// updating a invalid container
|
// updating a invalid container
|
||||||
throw new StorageContainerException("Update a container with invalid" +
|
throw new StorageContainerException("Update a container with invalid" +
|
||||||
|
@ -652,8 +654,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the in-memory map
|
// Update the in-memory map
|
||||||
ContainerStatus newStatus = new ContainerStatus(data);
|
containerMap.replace(containerID, data);
|
||||||
containerMap.replace(containerID, newStatus);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Restore the container file from backup
|
// Restore the container file from backup
|
||||||
if(containerFileBK != null && containerFileBK.exists() && deleted) {
|
if(containerFileBK != null && containerFileBK.exists() && deleted) {
|
||||||
|
@ -699,17 +700,12 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public boolean isOpen(long containerID) throws StorageContainerException {
|
public boolean isOpen(long containerID) throws StorageContainerException {
|
||||||
final ContainerStatus status = containerMap.get(containerID);
|
final ContainerData containerData = containerMap.get(containerID);
|
||||||
if (status == null) {
|
if (containerData == null) {
|
||||||
throw new StorageContainerException(
|
|
||||||
"Container status not found: " + containerID, CONTAINER_NOT_FOUND);
|
|
||||||
}
|
|
||||||
final ContainerData cData = status.getContainer();
|
|
||||||
if (cData == null) {
|
|
||||||
throw new StorageContainerException(
|
throw new StorageContainerException(
|
||||||
"Container not found: " + containerID, CONTAINER_NOT_FOUND);
|
"Container not found: " + containerID, CONTAINER_NOT_FOUND);
|
||||||
}
|
}
|
||||||
return cData.isOpen();
|
return containerData.isOpen();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -727,7 +723,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ConcurrentSkipListMap<Long, ContainerStatus> getContainerMap() {
|
public ConcurrentSkipListMap<Long, ContainerData> getContainerMap() {
|
||||||
return containerMap;
|
return containerMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -847,9 +843,9 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
// And we can never get the exact state since close might happen
|
// And we can never get the exact state since close might happen
|
||||||
// after we iterate a point.
|
// after we iterate a point.
|
||||||
return containerMap.entrySet().stream()
|
return containerMap.entrySet().stream()
|
||||||
.filter(containerStatus ->
|
.filter(containerData ->
|
||||||
!containerStatus.getValue().getContainer().isOpen())
|
!containerData.getValue().isOpen())
|
||||||
.map(containerStatus -> containerStatus.getValue().getContainer())
|
.map(containerData -> containerData.getValue())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -865,7 +861,7 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
// No need for locking since containerMap is a ConcurrentSkipListMap
|
// No need for locking since containerMap is a ConcurrentSkipListMap
|
||||||
// And we can never get the exact state since close might happen
|
// And we can never get the exact state since close might happen
|
||||||
// after we iterate a point.
|
// after we iterate a point.
|
||||||
List<ContainerStatus> containers = containerMap.values().stream()
|
List<ContainerData> containers = containerMap.values().stream()
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
ContainerReportsRequestProto.Builder crBuilder =
|
ContainerReportsRequestProto.Builder crBuilder =
|
||||||
|
@ -875,18 +871,17 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
|
crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
|
||||||
.setType(ContainerReportsRequestProto.reportType.fullReport);
|
.setType(ContainerReportsRequestProto.reportType.fullReport);
|
||||||
|
|
||||||
for (ContainerStatus container: containers) {
|
for (ContainerData container: containers) {
|
||||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
||||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||||
ciBuilder.setContainerID(container.getContainer().getContainerID())
|
ciBuilder.setContainerID(container.getContainerID())
|
||||||
.setSize(container.getContainer().getMaxSize())
|
.setSize(container.getMaxSize())
|
||||||
.setUsed(container.getContainer().getBytesUsed())
|
.setUsed(container.getBytesUsed())
|
||||||
.setKeyCount(container.getContainer().getKeyCount())
|
.setKeyCount(container.getKeyCount())
|
||||||
.setReadCount(container.getReadCount())
|
.setReadCount(container.getReadCount())
|
||||||
.setWriteCount(container.getWriteCount())
|
.setWriteCount(container.getWriteCount())
|
||||||
.setReadBytes(container.getReadBytes())
|
.setReadBytes(container.getReadBytes())
|
||||||
.setWriteBytes(container.getWriteBytes())
|
.setWriteBytes(container.getWriteBytes());
|
||||||
.setContainerID(container.getContainer().getContainerID());
|
|
||||||
|
|
||||||
crBuilder.addReports(ciBuilder.build());
|
crBuilder.addReports(ciBuilder.build());
|
||||||
}
|
}
|
||||||
|
@ -943,8 +938,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
public void incrPendingDeletionBlocks(int numBlocks, long containerId) {
|
public void incrPendingDeletionBlocks(int numBlocks, long containerId) {
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
status.incrPendingDeletionBlocks(numBlocks);
|
cData.incrPendingDeletionBlocks(numBlocks);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
@ -954,8 +949,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
public void decrPendingDeletionBlocks(int numBlocks, long containerId) {
|
public void decrPendingDeletionBlocks(int numBlocks, long containerId) {
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
status.decrPendingDeletionBlocks(numBlocks);
|
cData.decrPendingDeletionBlocks(numBlocks);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
|
@ -968,32 +963,37 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void incrReadCount(long containerId) {
|
public void incrReadCount(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
status.incrReadCount();
|
cData.incrReadCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getReadCount(long containerId) {
|
public long getReadCount(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.getReadCount();
|
return cData.getReadCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increse the read counter for bytes read from the container.
|
* Increase the read counter for bytes read from the container.
|
||||||
*
|
*
|
||||||
* @param containerId - ID of the container.
|
* @param containerId - ID of the container.
|
||||||
* @param readBytes - bytes read from the container.
|
* @param readBytes - bytes read from the container.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void incrReadBytes(long containerId, long readBytes) {
|
public void incrReadBytes(long containerId, long readBytes) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
status.incrReadBytes(readBytes);
|
cData.incrReadBytes(readBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns number of bytes read from the container.
|
||||||
|
* @param containerId
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
public long getReadBytes(long containerId) {
|
public long getReadBytes(long containerId) {
|
||||||
readLock();
|
readLock();
|
||||||
try {
|
try {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.getReadBytes();
|
return cData.getReadBytes();
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
|
@ -1006,13 +1006,13 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void incrWriteCount(long containerId) {
|
public void incrWriteCount(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
status.incrWriteCount();
|
cData.incrWriteCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getWriteCount(long containerId) {
|
public long getWriteCount(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.getWriteCount();
|
return cData.getWriteCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1023,13 +1023,13 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void incrWriteBytes(long containerId, long writeBytes) {
|
public void incrWriteBytes(long containerId, long writeBytes) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
status.incrWriteBytes(writeBytes);
|
cData.incrWriteBytes(writeBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getWriteBytes(long containerId) {
|
public long getWriteBytes(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.getWriteBytes();
|
return cData.getWriteBytes();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1041,8 +1041,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long incrBytesUsed(long containerId, long used) {
|
public long incrBytesUsed(long containerId, long used) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.incrBytesUsed(used);
|
return cData.incrBytesUsed(used);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1054,13 +1054,13 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long decrBytesUsed(long containerId, long used) {
|
public long decrBytesUsed(long containerId, long used) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.decrBytesUsed(used);
|
return cData.decrBytesUsed(used);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getBytesUsed(long containerId) {
|
public long getBytesUsed(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.getBytesUsed();
|
return cData.getBytesUsed();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1071,8 +1071,8 @@ public class ContainerManagerImpl implements ContainerManager {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long getNumKeys(long containerId) {
|
public long getNumKeys(long containerId) {
|
||||||
ContainerStatus status = containerMap.get(containerId);
|
ContainerData cData = containerMap.get(containerId);
|
||||||
return status.getNumKeys(); }
|
return cData.getKeyCount(); }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the container report state to send via HB to SCM.
|
* Get the container report state to send via HB to SCM.
|
||||||
|
|
|
@ -1,217 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.ozone.container.common.impl;
|
|
||||||
|
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class represents the state of a container. if the
|
|
||||||
* container reading encountered an error when we boot up we will post that
|
|
||||||
* info to a recovery queue and keep the info in the containerMap.
|
|
||||||
* <p/>
|
|
||||||
* if and when the issue is fixed, the expectation is that this entry will be
|
|
||||||
* deleted by the recovery thread from the containerMap and will insert entry
|
|
||||||
* instead of modifying this class.
|
|
||||||
*/
|
|
||||||
public class ContainerStatus {
|
|
||||||
private final ContainerData containerData;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Number of pending deletion blocks in container.
|
|
||||||
*/
|
|
||||||
private int numPendingDeletionBlocks;
|
|
||||||
|
|
||||||
private AtomicLong readBytes;
|
|
||||||
|
|
||||||
private AtomicLong writeBytes;
|
|
||||||
|
|
||||||
private AtomicLong readCount;
|
|
||||||
|
|
||||||
private AtomicLong writeCount;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a Container Status class.
|
|
||||||
*
|
|
||||||
* @param containerData - ContainerData.
|
|
||||||
*/
|
|
||||||
ContainerStatus(ContainerData containerData) {
|
|
||||||
this.numPendingDeletionBlocks = 0;
|
|
||||||
this.containerData = containerData;
|
|
||||||
this.readCount = new AtomicLong(0L);
|
|
||||||
this.readBytes = new AtomicLong(0L);
|
|
||||||
this.writeCount = new AtomicLong(0L);
|
|
||||||
this.writeBytes = new AtomicLong(0L);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns container if it is active. It is not active if we have had an
|
|
||||||
* error and we are waiting for the background threads to fix the issue.
|
|
||||||
*
|
|
||||||
* @return ContainerData.
|
|
||||||
*/
|
|
||||||
public ContainerData getContainer() {
|
|
||||||
return containerData;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the count of pending deletion blocks.
|
|
||||||
*
|
|
||||||
* @param numBlocks increment number
|
|
||||||
*/
|
|
||||||
public void incrPendingDeletionBlocks(int numBlocks) {
|
|
||||||
this.numPendingDeletionBlocks += numBlocks;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decrease the count of pending deletion blocks.
|
|
||||||
*
|
|
||||||
* @param numBlocks decrement number
|
|
||||||
*/
|
|
||||||
public void decrPendingDeletionBlocks(int numBlocks) {
|
|
||||||
this.numPendingDeletionBlocks -= numBlocks;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of pending deletion blocks.
|
|
||||||
*/
|
|
||||||
public int getNumPendingDeletionBlocks() {
|
|
||||||
return this.numPendingDeletionBlocks;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of bytes read from the container.
|
|
||||||
* @return the number of bytes read from the container.
|
|
||||||
*/
|
|
||||||
public long getReadBytes() {
|
|
||||||
return readBytes.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the number of bytes read from the container.
|
|
||||||
* @param bytes number of bytes read.
|
|
||||||
*/
|
|
||||||
public void incrReadBytes(long bytes) {
|
|
||||||
this.readBytes.addAndGet(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of times the container is read.
|
|
||||||
* @return the number of times the container is read.
|
|
||||||
*/
|
|
||||||
public long getReadCount() {
|
|
||||||
return readCount.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the number of container read count by 1.
|
|
||||||
*/
|
|
||||||
public void incrReadCount() {
|
|
||||||
this.readCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of bytes write into the container.
|
|
||||||
* @return the number of bytes write into the container.
|
|
||||||
*/
|
|
||||||
public long getWriteBytes() {
|
|
||||||
return writeBytes.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the number of bytes write into the container.
|
|
||||||
* @param bytes the number of bytes write into the container.
|
|
||||||
*/
|
|
||||||
public void incrWriteBytes(long bytes) {
|
|
||||||
this.writeBytes.addAndGet(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of writes into the container.
|
|
||||||
* @return the number of writes into the container.
|
|
||||||
*/
|
|
||||||
public long getWriteCount() {
|
|
||||||
return writeCount.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the number of writes into the container by 1.
|
|
||||||
*/
|
|
||||||
public void incrWriteCount() {
|
|
||||||
this.writeCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of bytes used by the container.
|
|
||||||
* @return the number of bytes used by the container.
|
|
||||||
*/
|
|
||||||
public long getBytesUsed() {
|
|
||||||
return containerData.getBytesUsed();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Increase the number of bytes used by the container.
|
|
||||||
* @param used number of bytes used by the container.
|
|
||||||
* @return the current number of bytes used by the container afert increase.
|
|
||||||
*/
|
|
||||||
public long incrBytesUsed(long used) {
|
|
||||||
return containerData.addBytesUsed(used);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the number of bytes used by the container.
|
|
||||||
* @param used the number of bytes used by the container.
|
|
||||||
*/
|
|
||||||
public void setBytesUsed(long used) {
|
|
||||||
containerData.setBytesUsed(used);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decrease the number of bytes used by the container.
|
|
||||||
* @param reclaimed the number of bytes reclaimed from the container.
|
|
||||||
* @return the current number of bytes used by the container after decrease.
|
|
||||||
*/
|
|
||||||
public long decrBytesUsed(long reclaimed) {
|
|
||||||
return this.containerData.addBytesUsed(-1L * reclaimed);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the maximum container size.
|
|
||||||
* @return the maximum container size.
|
|
||||||
*/
|
|
||||||
public long getMaxSize() {
|
|
||||||
return containerData.getMaxSize();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the maximum container size.
|
|
||||||
* @param size the maximum container size.
|
|
||||||
*/
|
|
||||||
public void setMaxSize(long size) {
|
|
||||||
this.containerData.setMaxSize(size);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of keys in the container.
|
|
||||||
* @return the number of keys in the container.
|
|
||||||
*/
|
|
||||||
public long getNumKeys() {
|
|
||||||
return containerData.getKeyCount();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -41,24 +41,24 @@ public class RandomContainerDeletionChoosingPolicy
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||||
Map<Long, ContainerStatus> candidateContainers)
|
Map<Long, ContainerData> candidateContainers)
|
||||||
throws StorageContainerException {
|
throws StorageContainerException {
|
||||||
Preconditions.checkNotNull(candidateContainers,
|
Preconditions.checkNotNull(candidateContainers,
|
||||||
"Internal assertion: candidate containers cannot be null");
|
"Internal assertion: candidate containers cannot be null");
|
||||||
|
|
||||||
int currentCount = 0;
|
int currentCount = 0;
|
||||||
List<ContainerData> result = new LinkedList<>();
|
List<ContainerData> result = new LinkedList<>();
|
||||||
ContainerStatus[] values = new ContainerStatus[candidateContainers.size()];
|
ContainerData[] values = new ContainerData[candidateContainers.size()];
|
||||||
// to get a shuffle list
|
// to get a shuffle list
|
||||||
for (ContainerStatus entry : DFSUtil.shuffle(
|
for (ContainerData entry : DFSUtil.shuffle(
|
||||||
candidateContainers.values().toArray(values))) {
|
candidateContainers.values().toArray(values))) {
|
||||||
if (currentCount < count) {
|
if (currentCount < count) {
|
||||||
result.add(entry.getContainer());
|
result.add(entry);
|
||||||
currentCount++;
|
currentCount++;
|
||||||
|
|
||||||
LOG.debug("Select container {} for block deletion, "
|
LOG.debug("Select container {} for block deletion, "
|
||||||
+ "pending deletion blocks num: {}.",
|
+ "pending deletion blocks num: {}.",
|
||||||
entry.getContainer().getContainerID(),
|
entry.getContainerID(),
|
||||||
entry.getNumPendingDeletionBlocks());
|
entry.getNumPendingDeletionBlocks());
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -41,11 +41,11 @@ public class TopNOrderedContainerDeletionChoosingPolicy
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class);
|
LoggerFactory.getLogger(TopNOrderedContainerDeletionChoosingPolicy.class);
|
||||||
|
|
||||||
/** customized comparator used to compare differentiate container status. **/
|
/** customized comparator used to compare differentiate container data. **/
|
||||||
private static final Comparator<ContainerStatus> CONTAINER_STATUS_COMPARATOR
|
private static final Comparator<ContainerData> CONTAINER_DATA_COMPARATOR
|
||||||
= new Comparator<ContainerStatus>() {
|
= new Comparator<ContainerData>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(ContainerStatus c1, ContainerStatus c2) {
|
public int compare(ContainerData c1, ContainerData c2) {
|
||||||
return Integer.compare(c2.getNumPendingDeletionBlocks(),
|
return Integer.compare(c2.getNumPendingDeletionBlocks(),
|
||||||
c1.getNumPendingDeletionBlocks());
|
c1.getNumPendingDeletionBlocks());
|
||||||
}
|
}
|
||||||
|
@ -53,28 +53,28 @@ public class TopNOrderedContainerDeletionChoosingPolicy
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
public List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||||
Map<Long, ContainerStatus> candidateContainers)
|
Map<Long, ContainerData> candidateContainers)
|
||||||
throws StorageContainerException {
|
throws StorageContainerException {
|
||||||
Preconditions.checkNotNull(candidateContainers,
|
Preconditions.checkNotNull(candidateContainers,
|
||||||
"Internal assertion: candidate containers cannot be null");
|
"Internal assertion: candidate containers cannot be null");
|
||||||
|
|
||||||
List<ContainerData> result = new LinkedList<>();
|
List<ContainerData> result = new LinkedList<>();
|
||||||
List<ContainerStatus> orderedList = new LinkedList<>();
|
List<ContainerData> orderedList = new LinkedList<>();
|
||||||
orderedList.addAll(candidateContainers.values());
|
orderedList.addAll(candidateContainers.values());
|
||||||
Collections.sort(orderedList, CONTAINER_STATUS_COMPARATOR);
|
Collections.sort(orderedList, CONTAINER_DATA_COMPARATOR);
|
||||||
|
|
||||||
// get top N list ordered by pending deletion blocks' number
|
// get top N list ordered by pending deletion blocks' number
|
||||||
int currentCount = 0;
|
int currentCount = 0;
|
||||||
for (ContainerStatus entry : orderedList) {
|
for (ContainerData entry : orderedList) {
|
||||||
if (currentCount < count) {
|
if (currentCount < count) {
|
||||||
if (entry.getNumPendingDeletionBlocks() > 0) {
|
if (entry.getNumPendingDeletionBlocks() > 0) {
|
||||||
result.add(entry.getContainer());
|
result.add(entry);
|
||||||
currentCount++;
|
currentCount++;
|
||||||
|
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Select container {} for block deletion, "
|
"Select container {} for block deletion, "
|
||||||
+ "pending deletion blocks num: {}.",
|
+ "pending deletion blocks num: {}.",
|
||||||
entry.getContainer().getContainerID(),
|
entry.getContainerID(),
|
||||||
entry.getNumPendingDeletionBlocks());
|
entry.getNumPendingDeletionBlocks());
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Stop looking for next container, there is no"
|
LOG.debug("Stop looking for next container, there is no"
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.container.common.interfaces;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerStatus;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -41,6 +40,6 @@ public interface ContainerDeletionChoosingPolicy {
|
||||||
* @throws StorageContainerException
|
* @throws StorageContainerException
|
||||||
*/
|
*/
|
||||||
List<ContainerData> chooseContainerForBlockDeletion(int count,
|
List<ContainerData> chooseContainerForBlockDeletion(int count,
|
||||||
Map<Long, ContainerStatus> candidateContainers)
|
Map<Long, ContainerData> candidateContainers)
|
||||||
throws StorageContainerException;
|
throws StorageContainerException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_ROOT_PREFIX;
|
||||||
|
@ -191,23 +190,23 @@ public class TestContainerPersistence {
|
||||||
containerManager.createContainer(data);
|
containerManager.createContainer(data);
|
||||||
Assert.assertTrue(containerManager.getContainerMap()
|
Assert.assertTrue(containerManager.getContainerMap()
|
||||||
.containsKey(testContainerID));
|
.containsKey(testContainerID));
|
||||||
ContainerStatus status = containerManager
|
ContainerData cData = containerManager
|
||||||
.getContainerMap().get(testContainerID);
|
.getContainerMap().get(testContainerID);
|
||||||
|
|
||||||
Assert.assertNotNull(status.getContainer());
|
Assert.assertNotNull(cData);
|
||||||
Assert.assertNotNull(status.getContainer().getContainerPath());
|
Assert.assertNotNull(cData.getContainerPath());
|
||||||
Assert.assertNotNull(status.getContainer().getDBPath());
|
Assert.assertNotNull(cData.getDBPath());
|
||||||
|
|
||||||
|
|
||||||
Assert.assertTrue(new File(status.getContainer().getContainerPath())
|
Assert.assertTrue(new File(cData.getContainerPath())
|
||||||
.exists());
|
.exists());
|
||||||
|
|
||||||
Path meta = Paths.get(status.getContainer().getDBPath()).getParent();
|
Path meta = Paths.get(cData.getDBPath()).getParent();
|
||||||
Assert.assertTrue(meta != null && Files.exists(meta));
|
Assert.assertTrue(meta != null && Files.exists(meta));
|
||||||
|
|
||||||
MetadataStore store = null;
|
MetadataStore store = null;
|
||||||
try {
|
try {
|
||||||
store = KeyUtils.getDB(status.getContainer(), conf);
|
store = KeyUtils.getDB(cData, conf);
|
||||||
Assert.assertNotNull(store);
|
Assert.assertNotNull(store);
|
||||||
} finally {
|
} finally {
|
||||||
if (store != null) {
|
if (store != null) {
|
||||||
|
@ -762,7 +761,7 @@ public class TestContainerPersistence {
|
||||||
|
|
||||||
// Verify in-memory map
|
// Verify in-memory map
|
||||||
ContainerData actualNewData = containerManager.getContainerMap()
|
ContainerData actualNewData = containerManager.getContainerMap()
|
||||||
.get(testContainerID).getContainer();
|
.get(testContainerID);
|
||||||
Assert.assertEquals("shire_new",
|
Assert.assertEquals("shire_new",
|
||||||
actualNewData.getAllMetadata().get("VOLUME"));
|
actualNewData.getAllMetadata().get("VOLUME"));
|
||||||
Assert.assertEquals("bilbo_new",
|
Assert.assertEquals("bilbo_new",
|
||||||
|
@ -805,7 +804,7 @@ public class TestContainerPersistence {
|
||||||
|
|
||||||
// Verify in-memory map
|
// Verify in-memory map
|
||||||
actualNewData = containerManager.getContainerMap()
|
actualNewData = containerManager.getContainerMap()
|
||||||
.get(testContainerID).getContainer();
|
.get(testContainerID);
|
||||||
Assert.assertEquals("shire_new_1",
|
Assert.assertEquals("shire_new_1",
|
||||||
actualNewData.getAllMetadata().get("VOLUME"));
|
actualNewData.getAllMetadata().get("VOLUME"));
|
||||||
Assert.assertEquals("bilbo_new_1",
|
Assert.assertEquals("bilbo_new_1",
|
||||||
|
|
Loading…
Reference in New Issue