HDDS-1401. Static ContainerCache in Datanodes can result in overwrite of container db. Contributed by Mukul Kumar Singh. (#708)
This commit is contained in:
parent
32722d2661
commit
df01469141
|
@ -69,15 +69,15 @@ public final class ContainerCache extends LRUMap {
|
||||||
/**
|
/**
|
||||||
* Closes a db instance.
|
* Closes a db instance.
|
||||||
*
|
*
|
||||||
* @param containerID - ID of the container to be closed.
|
* @param containerPath - path of the container db to be closed.
|
||||||
* @param db - db instance to close.
|
* @param db - db instance to close.
|
||||||
*/
|
*/
|
||||||
private void closeDB(long containerID, MetadataStore db) {
|
private void closeDB(String containerPath, MetadataStore db) {
|
||||||
if (db != null) {
|
if (db != null) {
|
||||||
try {
|
try {
|
||||||
db.close();
|
db.close();
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error closing DB. Container: " + containerID, e);
|
LOG.error("Error closing DB. Container: " + containerPath, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,7 @@ public final class ContainerCache extends LRUMap {
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
iterator.next();
|
iterator.next();
|
||||||
MetadataStore db = (MetadataStore) iterator.getValue();
|
MetadataStore db = (MetadataStore) iterator.getValue();
|
||||||
closeDB(((Number)iterator.getKey()).longValue(), db);
|
closeDB((String)iterator.getKey(), db);
|
||||||
}
|
}
|
||||||
// reset the cache
|
// reset the cache
|
||||||
cache.clear();
|
cache.clear();
|
||||||
|
@ -107,14 +107,18 @@ public final class ContainerCache extends LRUMap {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
protected boolean removeLRU(LinkEntry entry) {
|
protected boolean removeLRU(LinkEntry entry) {
|
||||||
|
MetadataStore db = (MetadataStore) entry.getValue();
|
||||||
|
String dbFile = (String)entry.getKey();
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
MetadataStore db = (MetadataStore) entry.getValue();
|
closeDB(dbFile, db);
|
||||||
closeDB(((Number)entry.getKey()).longValue(), db);
|
return true;
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Eviction for db:{} failed", dbFile, e);
|
||||||
|
return false;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -133,7 +137,7 @@ public final class ContainerCache extends LRUMap {
|
||||||
"Container ID cannot be negative.");
|
"Container ID cannot be negative.");
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
MetadataStore db = (MetadataStore) this.get(containerID);
|
MetadataStore db = (MetadataStore) this.get(containerDBPath);
|
||||||
|
|
||||||
if (db == null) {
|
if (db == null) {
|
||||||
db = MetadataStoreBuilder.newBuilder()
|
db = MetadataStoreBuilder.newBuilder()
|
||||||
|
@ -142,7 +146,7 @@ public final class ContainerCache extends LRUMap {
|
||||||
.setConf(conf)
|
.setConf(conf)
|
||||||
.setDBType(containerDBType)
|
.setDBType(containerDBType)
|
||||||
.build();
|
.build();
|
||||||
this.put(containerID, db);
|
this.put(containerDBPath, db);
|
||||||
}
|
}
|
||||||
return db;
|
return db;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -157,16 +161,14 @@ public final class ContainerCache extends LRUMap {
|
||||||
/**
|
/**
|
||||||
* Remove a DB handler from cache.
|
* Remove a DB handler from cache.
|
||||||
*
|
*
|
||||||
* @param containerID - ID of the container.
|
* @param containerPath - path of the container db file.
|
||||||
*/
|
*/
|
||||||
public void removeDB(long containerID) {
|
public void removeDB(String containerPath) {
|
||||||
Preconditions.checkState(containerID >= 0,
|
|
||||||
"Container ID cannot be negative.");
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
MetadataStore db = (MetadataStore)this.get(containerID);
|
MetadataStore db = (MetadataStore)this.get(containerPath);
|
||||||
closeDB(containerID, db);
|
closeDB(containerPath, db);
|
||||||
this.remove(containerID);
|
this.remove(containerPath);
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ public final class BlockUtils {
|
||||||
Preconditions.checkNotNull(container);
|
Preconditions.checkNotNull(container);
|
||||||
ContainerCache cache = ContainerCache.getInstance(conf);
|
ContainerCache cache = ContainerCache.getInstance(conf);
|
||||||
Preconditions.checkNotNull(cache);
|
Preconditions.checkNotNull(cache);
|
||||||
cache.removeDB(container.getContainerID());
|
cache.removeDB(container.getDbFile().getAbsolutePath());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class BlockManagerImpl implements BlockManager {
|
||||||
}
|
}
|
||||||
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
|
byte[] kData = db.get(Longs.toByteArray(blockID.getLocalID()));
|
||||||
if (kData == null) {
|
if (kData == null) {
|
||||||
throw new StorageContainerException("Unable to find the block.",
|
throw new StorageContainerException("Unable to find the block." + blockID,
|
||||||
NO_SUCH_BLOCK);
|
NO_SUCH_BLOCK);
|
||||||
}
|
}
|
||||||
ContainerProtos.BlockData blockData =
|
ContainerProtos.BlockData blockData =
|
||||||
|
|
|
@ -195,7 +195,7 @@ public class TestKeyValueContainer {
|
||||||
for (int i = 0; i < numberOfKeysToWrite; i++) {
|
for (int i = 0; i < numberOfKeysToWrite; i++) {
|
||||||
metadataStore.put(("test" + i).getBytes(UTF_8), "test".getBytes(UTF_8));
|
metadataStore.put(("test" + i).getBytes(UTF_8), "test".getBytes(UTF_8));
|
||||||
}
|
}
|
||||||
metadataStore.close();
|
BlockUtils.removeDB(keyValueContainerData, conf);
|
||||||
|
|
||||||
Map<String, String> metadata = new HashMap<>();
|
Map<String, String> metadata = new HashMap<>();
|
||||||
metadata.put("key1", "value1");
|
metadata.put("key1", "value1");
|
||||||
|
|
|
@ -189,8 +189,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
||||||
// factors are handled by pipeline creator
|
// factors are handled by pipeline creator
|
||||||
pipeline = pipelineManager.createPipeline(type, factor);
|
pipeline = pipelineManager.createPipeline(type, factor);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("pipeline creation failed type:{} factor:{}", type,
|
LOG.error("Pipeline creation failed for type:{} factor:{}",
|
||||||
factor, e);
|
type, factor, e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -255,7 +255,7 @@ public class TestContainerPersistence {
|
||||||
"Container cannot be deleted because it is not empty.");
|
"Container cannot be deleted because it is not empty.");
|
||||||
container2.delete();
|
container2.delete();
|
||||||
Assert.assertTrue(containerSet.getContainerMapCopy()
|
Assert.assertTrue(containerSet.getContainerMapCopy()
|
||||||
.containsKey(testContainerID1));
|
.containsKey(testContainerID2));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -34,16 +34,21 @@ import org.apache.hadoop.ozone.client.OzoneClient;
|
||||||
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
import org.apache.hadoop.ozone.client.OzoneClientFactory;
|
||||||
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
|
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||||
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.utils.MetadataStore;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
@ -221,13 +226,25 @@ public class TestCloseContainerByPipeline {
|
||||||
List<DatanodeDetails> datanodes = pipeline.getNodes();
|
List<DatanodeDetails> datanodes = pipeline.getNodes();
|
||||||
Assert.assertEquals(3, datanodes.size());
|
Assert.assertEquals(3, datanodes.size());
|
||||||
|
|
||||||
|
List<MetadataStore> metadataStores = new ArrayList<>(datanodes.size());
|
||||||
for (DatanodeDetails details : datanodes) {
|
for (DatanodeDetails details : datanodes) {
|
||||||
Assert.assertFalse(isContainerClosed(cluster, containerID, details));
|
Assert.assertFalse(isContainerClosed(cluster, containerID, details));
|
||||||
//send the order to close the container
|
//send the order to close the container
|
||||||
cluster.getStorageContainerManager().getScmNodeManager()
|
cluster.getStorageContainerManager().getScmNodeManager()
|
||||||
.addDatanodeCommand(details.getUuid(),
|
.addDatanodeCommand(details.getUuid(),
|
||||||
new CloseContainerCommand(containerID, pipeline.getId()));
|
new CloseContainerCommand(containerID, pipeline.getId()));
|
||||||
|
int index = cluster.getHddsDatanodeIndex(details);
|
||||||
|
Container dnContainer = cluster.getHddsDatanodes().get(index)
|
||||||
|
.getDatanodeStateMachine().getContainer().getContainerSet()
|
||||||
|
.getContainer(containerID);
|
||||||
|
metadataStores.add(BlockUtils.getDB((KeyValueContainerData) dnContainer
|
||||||
|
.getContainerData(), conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// There should be as many rocks db as the number of datanodes in pipeline.
|
||||||
|
Assert.assertEquals(datanodes.size(),
|
||||||
|
metadataStores.stream().distinct().count());
|
||||||
|
|
||||||
// Make sure that it is CLOSED
|
// Make sure that it is CLOSED
|
||||||
for (DatanodeDetails datanodeDetails : datanodes) {
|
for (DatanodeDetails datanodeDetails : datanodes) {
|
||||||
GenericTestUtils.waitFor(
|
GenericTestUtils.waitFor(
|
||||||
|
|
Loading…
Reference in New Issue