HDDS-864. Use strongly typed codec implementations for the tables of the OmMetadataManager. Contributed by Elek Marton.
This commit is contained in:
parent
c03024a530
commit
343aaea2d1
|
@ -28,11 +28,14 @@ public interface Codec<T> {
|
|||
|
||||
/**
|
||||
* Convert object to raw persisted format.
|
||||
* @param object The original java object. Should not be null.
|
||||
*/
|
||||
byte[] toPersistedFormat(T object);
|
||||
|
||||
/**
|
||||
* Convert object from raw persisted format.
|
||||
*
|
||||
* @param rawData Byte array from the key/value store. Should not be null.
|
||||
*/
|
||||
T fromPersistedFormat(byte[] rawData);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.utils.db;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Collection of available codecs.
|
||||
*/
|
||||
|
@ -42,6 +44,9 @@ public class CodecRegistry {
|
|||
* @return the object with the parsed field data
|
||||
*/
|
||||
public <T> T asObject(byte[] rawData, Class<T> format) {
|
||||
if (rawData == null) {
|
||||
return null;
|
||||
}
|
||||
if (valueCodecs.containsKey(format)) {
|
||||
return (T) valueCodecs.get(format).fromPersistedFormat(rawData);
|
||||
} else {
|
||||
|
@ -58,6 +63,8 @@ public class CodecRegistry {
|
|||
* @return byte array to store it ini the kv store.
|
||||
*/
|
||||
public <T> byte[] asRawData(T object) {
|
||||
Preconditions.checkNotNull(object,
|
||||
"Null value shouldn't be persisted in the database");
|
||||
Class<T> format = (Class<T>) object.getClass();
|
||||
if (valueCodecs.containsKey(format)) {
|
||||
Codec<T> codec = (Codec<T>) valueCodecs.get(format);
|
||||
|
@ -67,4 +74,16 @@ public class CodecRegistry {
|
|||
"Codec is not registered for type: " + format);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Addds codec to the internal collection.
|
||||
*
|
||||
* @param type Type of the codec source/destination object.
|
||||
* @param codec The codec itself.
|
||||
* @param <T> The type of the codec
|
||||
*/
|
||||
public <T> void addCodec(Class<T> type, Codec<T> codec) {
|
||||
valueCodecs.put(type, codec);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -77,7 +77,8 @@ public interface DBStore extends AutoCloseable {
|
|||
* @param dest - Destination Table.
|
||||
* @throws IOException on Failure
|
||||
*/
|
||||
void move(byte[] key, Table source, Table dest) throws IOException;
|
||||
<KEY, VALUE> void move(KEY key, Table<KEY, VALUE> source,
|
||||
Table<KEY, VALUE> dest) throws IOException;
|
||||
|
||||
/**
|
||||
* Moves a key from the Source Table to the destination Table and updates the
|
||||
|
@ -89,7 +90,8 @@ public interface DBStore extends AutoCloseable {
|
|||
* @param dest - Destination Table.
|
||||
* @throws IOException on Failure
|
||||
*/
|
||||
void move(byte[] key, byte[] value, Table source, Table dest)
|
||||
<KEY, VALUE> void move(KEY key, VALUE value, Table<KEY, VALUE> source,
|
||||
Table<KEY, VALUE> dest)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -105,8 +107,9 @@ public interface DBStore extends AutoCloseable {
|
|||
* @param dest - Destination Table.
|
||||
* @throws IOException on Failure
|
||||
*/
|
||||
void move(byte[] sourceKey, byte[] destKey, byte[] value,
|
||||
Table source, Table dest) throws IOException;
|
||||
<KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
|
||||
Table<KEY, VALUE> source, Table<KEY, VALUE> dest)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns an estimated count of keys in this DB.
|
||||
|
|
|
@ -56,11 +56,13 @@ public final class DBStoreBuilder {
|
|||
private Path dbPath;
|
||||
private List<String> tableNames;
|
||||
private Configuration configuration;
|
||||
private CodecRegistry registry;
|
||||
|
||||
private DBStoreBuilder(Configuration configuration) {
|
||||
tables = new HashSet<>();
|
||||
tableNames = new LinkedList<>();
|
||||
this.configuration = configuration;
|
||||
this.registry = new CodecRegistry();
|
||||
}
|
||||
|
||||
public static DBStoreBuilder newBuilder(Configuration configuration) {
|
||||
|
@ -82,6 +84,11 @@ public final class DBStoreBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public <T> DBStoreBuilder addCodec(Class<T> type, Codec<T> codec) {
|
||||
registry.addCodec(type, codec);
|
||||
return this;
|
||||
}
|
||||
|
||||
public DBStoreBuilder addTable(String tableName, ColumnFamilyOptions option)
|
||||
throws IOException {
|
||||
TableConfig tableConfig = new TableConfig(tableName, option);
|
||||
|
@ -124,7 +131,7 @@ public final class DBStoreBuilder {
|
|||
if (!dbFile.getParentFile().exists()) {
|
||||
throw new IOException("The DB destination directory should exist.");
|
||||
}
|
||||
return new RDBStore(dbFile, options, tables);
|
||||
return new RDBStore(dbFile, options, tables, registry);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,22 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.utils.RocksDBStoreMBean;
|
||||
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.rocksdb.ColumnFamilyDescriptor;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.DBOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.WriteBatch;
|
||||
import org.rocksdb.WriteOptions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -45,6 +29,22 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hdds.HddsUtils;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.utils.RocksDBStoreMBean;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.rocksdb.ColumnFamilyDescriptor;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.DBOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.WriteOptions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* RocksDB Store that supports creating Tables in DB.
|
||||
*/
|
||||
|
@ -59,13 +59,20 @@ public class RDBStore implements DBStore {
|
|||
private final Hashtable<String, ColumnFamilyHandle> handleTable;
|
||||
private ObjectName statMBeanName;
|
||||
|
||||
public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families)
|
||||
@VisibleForTesting
|
||||
public RDBStore(File dbFile, DBOptions options,
|
||||
Set<TableConfig> families) throws IOException {
|
||||
this(dbFile, options, families, new CodecRegistry());
|
||||
}
|
||||
|
||||
public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families,
|
||||
CodecRegistry registry)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
|
||||
Preconditions.checkNotNull(families);
|
||||
Preconditions.checkArgument(families.size() > 0);
|
||||
handleTable = new Hashtable<>();
|
||||
codecRegistry = new CodecRegistry();
|
||||
codecRegistry = registry;
|
||||
final List<ColumnFamilyDescriptor> columnFamilyDescriptors =
|
||||
new ArrayList<>();
|
||||
final List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
|
||||
|
@ -161,70 +168,31 @@ public class RDBStore implements DBStore {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void move(byte[] key, Table source, Table dest) throws IOException {
|
||||
RDBTable sourceTable;
|
||||
RDBTable destTable;
|
||||
if (source instanceof RDBTable) {
|
||||
sourceTable = (RDBTable) source;
|
||||
} else {
|
||||
LOG.error("Unexpected Table type. Expected RocksTable Store for Source.");
|
||||
throw new IOException("Unexpected TableStore Type in source. Expected "
|
||||
+ "RocksDBTable.");
|
||||
}
|
||||
public <KEY, VALUE> void move(KEY key, Table<KEY, VALUE> source,
|
||||
Table<KEY, VALUE> dest) throws IOException {
|
||||
try (BatchOperation batchOperation = initBatchOperation()) {
|
||||
|
||||
if (dest instanceof RDBTable) {
|
||||
destTable = (RDBTable) dest;
|
||||
} else {
|
||||
LOG.error("Unexpected Table type. Expected RocksTable Store for Dest.");
|
||||
throw new IOException("Unexpected TableStore Type in dest. Expected "
|
||||
+ "RocksDBTable.");
|
||||
}
|
||||
try (WriteBatch batch = new WriteBatch()) {
|
||||
byte[] value = sourceTable.get(key);
|
||||
batch.put(destTable.getHandle(), key, value);
|
||||
batch.delete(sourceTable.getHandle(), key);
|
||||
db.write(writeOptions, batch);
|
||||
} catch (RocksDBException rockdbException) {
|
||||
LOG.error("Move of key failed. Key:{}", DFSUtil.bytes2String(key));
|
||||
throw toIOException("Unable to move key: " + DFSUtil.bytes2String(key),
|
||||
rockdbException);
|
||||
VALUE value = source.get(key);
|
||||
dest.putWithBatch(batchOperation, key, value);
|
||||
source.deleteWithBatch(batchOperation, key);
|
||||
commitBatchOperation(batchOperation);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(byte[] key, byte[] value, Table source,
|
||||
Table dest) throws IOException {
|
||||
public <KEY, VALUE> void move(KEY key, VALUE value, Table<KEY, VALUE> source,
|
||||
Table<KEY, VALUE> dest) throws IOException {
|
||||
move(key, key, value, source, dest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(byte[] sourceKey, byte[] destKey, byte[] value, Table source,
|
||||
Table dest) throws IOException {
|
||||
RDBTable sourceTable;
|
||||
RDBTable destTable;
|
||||
if (source instanceof RDBTable) {
|
||||
sourceTable = (RDBTable) source;
|
||||
} else {
|
||||
LOG.error("Unexpected Table type. Expected RocksTable Store for Source.");
|
||||
throw new IOException("Unexpected TableStore Type in source. Expected "
|
||||
+ "RocksDBTable.");
|
||||
}
|
||||
|
||||
if (dest instanceof RDBTable) {
|
||||
destTable = (RDBTable) dest;
|
||||
} else {
|
||||
LOG.error("Unexpected Table type. Expected RocksTable Store for Dest.");
|
||||
throw new IOException("Unexpected TableStore Type in dest. Expected "
|
||||
+ "RocksDBTable.");
|
||||
}
|
||||
try (WriteBatch batch = new WriteBatch()) {
|
||||
batch.put(destTable.getHandle(), destKey, value);
|
||||
batch.delete(sourceTable.getHandle(), sourceKey);
|
||||
db.write(writeOptions, batch);
|
||||
} catch (RocksDBException rockdbException) {
|
||||
LOG.error("Move of key failed. Key:{}", DFSUtil.bytes2String(sourceKey));
|
||||
throw toIOException("Unable to move key: " +
|
||||
DFSUtil.bytes2String(sourceKey), rockdbException);
|
||||
public <KEY, VALUE> void move(KEY sourceKey, KEY destKey, VALUE value,
|
||||
Table<KEY, VALUE> source,
|
||||
Table<KEY, VALUE> dest) throws IOException {
|
||||
try (BatchOperation batchOperation = initBatchOperation()) {
|
||||
dest.putWithBatch(batchOperation, destKey, value);
|
||||
source.deleteWithBatch(batchOperation, sourceKey);
|
||||
commitBatchOperation(batchOperation);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -162,6 +162,9 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, VALUE> {
|
|||
public TypedKeyValue seek(KEY key) {
|
||||
byte[] keyBytes = codecRegistry.asRawData(key);
|
||||
KeyValue<byte[], byte[]> result = rawIterator.seek(keyBytes);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new TypedKeyValue(result);
|
||||
}
|
||||
|
||||
|
|
|
@ -16,17 +16,19 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.Table;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* OM metadata manager interface.
|
||||
|
@ -65,14 +67,14 @@ public interface OMMetadataManager {
|
|||
*
|
||||
* @param volume - Volume name
|
||||
*/
|
||||
byte[] getVolumeKey(String volume);
|
||||
String getVolumeKey(String volume);
|
||||
|
||||
/**
|
||||
* Given a user return the corresponding DB key.
|
||||
*
|
||||
* @param user - User name
|
||||
*/
|
||||
byte[] getUserKey(String user);
|
||||
String getUserKey(String user);
|
||||
|
||||
/**
|
||||
* Given a volume and bucket, return the corresponding DB key.
|
||||
|
@ -80,7 +82,7 @@ public interface OMMetadataManager {
|
|||
* @param volume - User name
|
||||
* @param bucket - Bucket name
|
||||
*/
|
||||
byte[] getBucketKey(String volume, String bucket);
|
||||
String getBucketKey(String volume, String bucket);
|
||||
|
||||
/**
|
||||
* Given a volume, bucket and a key, return the corresponding DB key.
|
||||
|
@ -88,9 +90,11 @@ public interface OMMetadataManager {
|
|||
* @param volume - volume name
|
||||
* @param bucket - bucket name
|
||||
* @param key - key name
|
||||
* @return bytes of DB key.
|
||||
* @return DB key as String.
|
||||
*/
|
||||
byte[] getOzoneKeyBytes(String volume, String bucket, String key);
|
||||
|
||||
String getOzoneKey(String volume, String bucket, String key);
|
||||
|
||||
|
||||
/**
|
||||
* Returns the DB key name of a open key in OM metadata store. Should be
|
||||
|
@ -102,7 +106,7 @@ public interface OMMetadataManager {
|
|||
* @param id - the id for this open
|
||||
* @return bytes of DB key.
|
||||
*/
|
||||
byte[] getOpenKeyBytes(String volume, String bucket, String key, long id);
|
||||
String getOpenKey(String volume, String bucket, String key, long id);
|
||||
|
||||
/**
|
||||
* Given a volume, check if it is empty, i.e there are no buckets inside it.
|
||||
|
@ -203,42 +207,42 @@ public interface OMMetadataManager {
|
|||
*
|
||||
* @return UserTable.
|
||||
*/
|
||||
Table<byte[], byte[]> getUserTable();
|
||||
Table<String, VolumeList> getUserTable();
|
||||
|
||||
/**
|
||||
* Returns the Volume Table.
|
||||
*
|
||||
* @return VolumeTable.
|
||||
*/
|
||||
Table<byte[], byte[]> getVolumeTable();
|
||||
Table<String, OmVolumeArgs> getVolumeTable();
|
||||
|
||||
/**
|
||||
* Returns the BucketTable.
|
||||
*
|
||||
* @return BucketTable.
|
||||
*/
|
||||
Table<byte[], byte[]> getBucketTable();
|
||||
Table<String, OmBucketInfo> getBucketTable();
|
||||
|
||||
/**
|
||||
* Returns the KeyTable.
|
||||
*
|
||||
* @return KeyTable.
|
||||
*/
|
||||
Table<byte[], byte[]> getKeyTable();
|
||||
Table<String, OmKeyInfo> getKeyTable();
|
||||
|
||||
/**
|
||||
* Get Deleted Table.
|
||||
*
|
||||
* @return Deleted Table.
|
||||
*/
|
||||
Table<byte[], byte[]> getDeletedTable();
|
||||
Table<String, OmKeyInfo> getDeletedTable();
|
||||
|
||||
/**
|
||||
* Gets the OpenKeyTable.
|
||||
*
|
||||
* @return Table.
|
||||
*/
|
||||
Table<byte[], byte[]> getOpenKeyTable();
|
||||
Table<String, OmKeyInfo> getOpenKeyTable();
|
||||
|
||||
/**
|
||||
* Gets the S3Bucket to Ozone Volume/bucket mapping table.
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.codec;
|
||||
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
|
||||
import org.apache.hadoop.utils.db.Codec;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* Codec to encode OmBucketInfo as byte array.
|
||||
*/
|
||||
public class OmBucketInfoCodec implements Codec<OmBucketInfo> {
|
||||
|
||||
@Override
|
||||
public byte[] toPersistedFormat(OmBucketInfo object) {
|
||||
Preconditions
|
||||
.checkNotNull(object, "Null object can't be converted to byte array.");
|
||||
return object.getProtobuf().toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OmBucketInfo fromPersistedFormat(byte[] rawData) {
|
||||
Preconditions
|
||||
.checkNotNull("Null byte array can't converted to real object.");
|
||||
try {
|
||||
return OmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(rawData));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Can't encode the the raw data from the byte array", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.codec;
|
||||
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.utils.db.Codec;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* Codec to encode OmKeyInfo as byte array.
|
||||
*/
|
||||
public class OmKeyInfoCodec implements Codec<OmKeyInfo> {
|
||||
|
||||
@Override
|
||||
public byte[] toPersistedFormat(OmKeyInfo object) {
|
||||
Preconditions
|
||||
.checkNotNull(object, "Null object can't be converted to byte array.");
|
||||
return object.getProtobuf().toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OmKeyInfo fromPersistedFormat(byte[] rawData) {
|
||||
Preconditions
|
||||
.checkNotNull("Null byte array can't converted to real object.");
|
||||
try {
|
||||
return OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(rawData));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Can't encode the the raw data from the byte array", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.codec;
|
||||
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
|
||||
import org.apache.hadoop.utils.db.Codec;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* Codec to encode OmVolumeArgsCodec as byte array.
|
||||
*/
|
||||
public class OmVolumeArgsCodec implements Codec<OmVolumeArgs> {
|
||||
|
||||
@Override
|
||||
public byte[] toPersistedFormat(OmVolumeArgs object) {
|
||||
Preconditions
|
||||
.checkNotNull(object, "Null object can't be converted to byte array.");
|
||||
return object.getProtobuf().toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public OmVolumeArgs fromPersistedFormat(byte[] rawData) {
|
||||
Preconditions
|
||||
.checkNotNull("Null byte array can't converted to real object.");
|
||||
try {
|
||||
return OmVolumeArgs.getFromProtobuf(VolumeInfo.parseFrom(rawData));
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Can't encode the the raw data from the byte array", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.codec;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
|
||||
import org.apache.hadoop.utils.db.Codec;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
/**
|
||||
* Codec to encode VolumeList as byte array.
|
||||
*/
|
||||
public class VolumeListCodec implements Codec<VolumeList> {
|
||||
|
||||
@Override
|
||||
public byte[] toPersistedFormat(VolumeList object) {
|
||||
Preconditions
|
||||
.checkNotNull(object, "Null object can't be converted to byte array.");
|
||||
return object.toByteArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public VolumeList fromPersistedFormat(byte[] rawData) {
|
||||
Preconditions
|
||||
.checkNotNull("Null byte array can't converted to real object.");
|
||||
try {
|
||||
return VolumeList.parseFrom(rawData);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Can't encode the the raw data from the byte array", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* <p>
|
||||
* Utility classes to encode/decode DTO objects to/from byte array.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Utility classes to encode/decode DTO objects to/from byte array.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.om.codec;
|
|
@ -17,14 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.om.helpers;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Args for key block. The block instance for the key requested in putKey.
|
||||
|
@ -45,7 +47,8 @@ public final class OmKeyInfo {
|
|||
|
||||
private OmKeyInfo(String volumeName, String bucketName, String keyName,
|
||||
List<OmKeyLocationInfoGroup> versions, long dataSize,
|
||||
long creationTime, long modificationTime, HddsProtos.ReplicationType type,
|
||||
long creationTime, long modificationTime,
|
||||
HddsProtos.ReplicationType type,
|
||||
HddsProtos.ReplicationFactor factor) {
|
||||
this.volumeName = volumeName;
|
||||
this.bucketName = bucketName;
|
||||
|
@ -206,7 +209,8 @@ public final class OmKeyInfo {
|
|||
private String bucketName;
|
||||
private String keyName;
|
||||
private long dataSize;
|
||||
private List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups;
|
||||
private List<OmKeyLocationInfoGroup> omKeyLocationInfoGroups =
|
||||
new ArrayList<>();
|
||||
private long creationTime;
|
||||
private long modificationTime;
|
||||
private HddsProtos.ReplicationType type;
|
||||
|
@ -248,13 +252,13 @@ public final class OmKeyInfo {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setReplicationFactor(HddsProtos.ReplicationFactor factor) {
|
||||
this.factor = factor;
|
||||
public Builder setReplicationFactor(HddsProtos.ReplicationFactor replFact) {
|
||||
this.factor = replFact;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setReplicationType(HddsProtos.ReplicationType type) {
|
||||
this.type = type;
|
||||
public Builder setReplicationType(HddsProtos.ReplicationType replType) {
|
||||
this.type = replType;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
|
|
@ -219,7 +219,8 @@ public final class OmVolumeArgs implements Auditable{
|
|||
.setQuotaInBytes(quotaInBytes)
|
||||
.addAllMetadata(metadataList)
|
||||
.addAllVolumeAcls(aclList)
|
||||
.setCreationTime(creationTime)
|
||||
.setCreationTime(
|
||||
creationTime == 0 ? System.currentTimeMillis() : creationTime)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -32,10 +32,12 @@ import org.apache.hadoop.ozone.common.BlockGroup;
|
|||
import org.apache.hadoop.ozone.client.rest.OzoneException;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.hdds.scm.server.SCMStorage;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.OzoneManagerProtocolProtos.ServicePort;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
|
||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||
|
@ -293,8 +295,8 @@ public class TestOzoneManager {
|
|||
OMMetadataManager metadataManager =
|
||||
cluster.getOzoneManager().getMetadataManager();
|
||||
|
||||
byte[] userKey = metadataManager.getUserKey(userName);
|
||||
byte[] volumes = metadataManager.getUserTable().get(userKey);
|
||||
String userKey = metadataManager.getUserKey(userName);
|
||||
VolumeList volumes = metadataManager.getUserTable().get(userKey);
|
||||
|
||||
//that was the last volume of the user, shouldn't be any record here
|
||||
Assert.assertNull(volumes);
|
||||
|
@ -653,7 +655,7 @@ public class TestOzoneManager {
|
|||
// Make sure the deleted key has been moved to the deleted table.
|
||||
OMMetadataManager manager = cluster.getOzoneManager().
|
||||
getMetadataManager();
|
||||
try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> iter =
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> iter =
|
||||
manager.getDeletedTable().iterator()) {
|
||||
iter.seekToFirst();
|
||||
Table.KeyValue kv = iter.next();
|
||||
|
|
|
@ -16,21 +16,21 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.iq80.leveldb.DBException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* OM bucket manager.
|
||||
*/
|
||||
|
@ -84,8 +84,8 @@ public class BucketManagerImpl implements BucketManager {
|
|||
metadataManager.getLock().acquireVolumeLock(volumeName);
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
|
||||
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
String volumeKey = metadataManager.getVolumeKey(volumeName);
|
||||
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
|
||||
//Check if the volume exists
|
||||
if (metadataManager.getVolumeTable().get(volumeKey) == null) {
|
||||
|
@ -109,7 +109,7 @@ public class BucketManagerImpl implements BucketManager {
|
|||
.setCreationTime(Time.now())
|
||||
.build();
|
||||
metadataManager.getBucketTable().put(bucketKey,
|
||||
omBucketInfo.getProtobuf().toByteArray());
|
||||
omBucketInfo);
|
||||
|
||||
LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
|
||||
} catch (IOException | DBException ex) {
|
||||
|
@ -137,15 +137,15 @@ public class BucketManagerImpl implements BucketManager {
|
|||
Preconditions.checkNotNull(bucketName);
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
byte[] value = metadataManager.getBucketTable().get(bucketKey);
|
||||
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
OmBucketInfo value = metadataManager.getBucketTable().get(bucketKey);
|
||||
if (value == null) {
|
||||
LOG.debug("bucket: {} not found in volume: {}.", bucketName,
|
||||
volumeName);
|
||||
throw new OMException("Bucket not found",
|
||||
OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||
}
|
||||
return OmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value));
|
||||
return value;
|
||||
} catch (IOException | DBException ex) {
|
||||
if (!(ex instanceof OMException)) {
|
||||
LOG.error("Exception while getting bucket info for bucket: {}",
|
||||
|
@ -170,16 +170,15 @@ public class BucketManagerImpl implements BucketManager {
|
|||
String bucketName = args.getBucketName();
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
byte[] value = metadataManager.getBucketTable().get(bucketKey);
|
||||
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
OmBucketInfo oldBucketInfo =
|
||||
metadataManager.getBucketTable().get(bucketKey);
|
||||
//Check if bucket exist
|
||||
if (value == null) {
|
||||
if (oldBucketInfo == null) {
|
||||
LOG.debug("bucket: {} not found ", bucketName);
|
||||
throw new OMException("Bucket doesn't exist",
|
||||
OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||
}
|
||||
OmBucketInfo oldBucketInfo = OmBucketInfo.getFromProtobuf(
|
||||
BucketInfo.parseFrom(value));
|
||||
OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder();
|
||||
bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
|
||||
.setBucketName(oldBucketInfo.getBucketName());
|
||||
|
@ -216,8 +215,8 @@ public class BucketManagerImpl implements BucketManager {
|
|||
}
|
||||
bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime());
|
||||
|
||||
metadataManager.getBucketTable().put(bucketKey,
|
||||
bucketInfoBuilder.build().getProtobuf().toByteArray());
|
||||
metadataManager.getBucketTable()
|
||||
.put(bucketKey, bucketInfoBuilder.build());
|
||||
} catch (IOException | DBException ex) {
|
||||
if (!(ex instanceof OMException)) {
|
||||
LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
|
||||
|
@ -265,7 +264,7 @@ public class BucketManagerImpl implements BucketManager {
|
|||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
//Check if bucket exists
|
||||
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
if (metadataManager.getBucketTable().get(bucketKey) == null) {
|
||||
LOG.debug("bucket: {} not found ", bucketName);
|
||||
throw new OMException("Bucket doesn't exist",
|
||||
|
|
|
@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -162,7 +161,7 @@ public class KeyDeletingService extends BackgroundService {
|
|||
if (result.isSuccess()) {
|
||||
// Purge key from OM DB.
|
||||
deletedTable.deleteWithBatch(writeBatch,
|
||||
DFSUtil.string2Bytes(result.getObjectKey()));
|
||||
result.getObjectKey());
|
||||
LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
|
||||
deletedCount++;
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
|||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
||||
|
@ -39,8 +38,6 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
|||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocationList;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.apache.hadoop.utils.db.BatchOperation;
|
||||
|
@ -125,8 +122,8 @@ public class KeyManagerImpl implements KeyManager {
|
|||
|
||||
private void validateBucket(String volumeName, String bucketName)
|
||||
throws IOException {
|
||||
byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
|
||||
byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
String volumeKey = metadataManager.getVolumeKey(volumeName);
|
||||
String bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
|
||||
|
||||
//Check if the volume exists
|
||||
if (metadataManager.getVolumeTable().get(volumeKey) == null) {
|
||||
|
@ -150,18 +147,17 @@ public class KeyManagerImpl implements KeyManager {
|
|||
String bucketName = args.getBucketName();
|
||||
String keyName = args.getKeyName();
|
||||
validateBucket(volumeName, bucketName);
|
||||
byte[] openKey = metadataManager.getOpenKeyBytes(
|
||||
String openKey = metadataManager.getOpenKey(
|
||||
volumeName, bucketName, keyName, clientID);
|
||||
|
||||
byte[] keyData = metadataManager.getOpenKeyTable().get(openKey);
|
||||
if (keyData == null) {
|
||||
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
|
||||
if (keyInfo == null) {
|
||||
LOG.error("Allocate block for a key not in open status in meta store" +
|
||||
" /{}/{}/{} with ID {}", volumeName, bucketName, keyName, clientID);
|
||||
throw new OMException("Open Key not found",
|
||||
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
|
||||
}
|
||||
OmKeyInfo keyInfo =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
|
||||
|
||||
AllocatedBlock allocatedBlock;
|
||||
try {
|
||||
allocatedBlock =
|
||||
|
@ -184,7 +180,7 @@ public class KeyManagerImpl implements KeyManager {
|
|||
keyInfo.appendNewBlocks(Collections.singletonList(info));
|
||||
keyInfo.updateModifcationTime();
|
||||
metadataManager.getOpenKeyTable().put(openKey,
|
||||
keyInfo.getProtobuf().toByteArray());
|
||||
keyInfo);
|
||||
return info;
|
||||
}
|
||||
|
||||
|
@ -214,7 +210,7 @@ public class KeyManagerImpl implements KeyManager {
|
|||
try {
|
||||
long requestedSize = Math.min(preallocateMax, args.getDataSize());
|
||||
List<OmKeyLocationInfo> locations = new ArrayList<>();
|
||||
byte[] objectKey = metadataManager.getOzoneKeyBytes(
|
||||
String objectKey = metadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName);
|
||||
// requested size is not required but more like a optimization:
|
||||
// SCM looks at the requested, if it 0, no block will be allocated at
|
||||
|
@ -250,12 +246,10 @@ public class KeyManagerImpl implements KeyManager {
|
|||
// value, then this value is used, otherwise, we allocate a single block
|
||||
// which is the current size, if read by the client.
|
||||
long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
|
||||
byte[] value = metadataManager.getKeyTable().get(objectKey);
|
||||
OmKeyInfo keyInfo;
|
||||
OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
|
||||
long openVersion;
|
||||
if (value != null) {
|
||||
if (keyInfo != null) {
|
||||
// the key already exist, the new blocks will be added as new version
|
||||
keyInfo = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
|
||||
// when locations.size = 0, the new version will have identical blocks
|
||||
// as its previous version
|
||||
openVersion = keyInfo.addNewVersion(locations);
|
||||
|
@ -278,7 +272,7 @@ public class KeyManagerImpl implements KeyManager {
|
|||
.build();
|
||||
openVersion = 0;
|
||||
}
|
||||
byte[] openKey = metadataManager.getOpenKeyBytes(
|
||||
String openKey = metadataManager.getOpenKey(
|
||||
volumeName, bucketName, keyName, currentTime);
|
||||
if (metadataManager.getOpenKeyTable().get(openKey) != null) {
|
||||
// This should not happen. If this condition is satisfied, it means
|
||||
|
@ -293,8 +287,7 @@ public class KeyManagerImpl implements KeyManager {
|
|||
throw new OMException("Cannot allocate key. Not able to get a valid" +
|
||||
"open key id.", OMException.ResultCodes.FAILED_KEY_ALLOCATION);
|
||||
}
|
||||
metadataManager.getOpenKeyTable().put(openKey,
|
||||
keyInfo.getProtobuf().toByteArray());
|
||||
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
|
||||
LOG.debug("Key {} allocated in volume {} bucket {}",
|
||||
keyName, volumeName, bucketName);
|
||||
return new OpenKeySession(currentTime, keyInfo, openVersion);
|
||||
|
@ -319,17 +312,15 @@ public class KeyManagerImpl implements KeyManager {
|
|||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
validateBucket(volumeName, bucketName);
|
||||
byte[] openKey = metadataManager.getOpenKeyBytes(volumeName, bucketName,
|
||||
String openKey = metadataManager.getOpenKey(volumeName, bucketName,
|
||||
keyName, clientID);
|
||||
byte[] objectKey = metadataManager.getOzoneKeyBytes(
|
||||
String objectKey = metadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName);
|
||||
byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
|
||||
if (openKeyData == null) {
|
||||
OmKeyInfo keyInfo = metadataManager.getOpenKeyTable().get(openKey);
|
||||
if (keyInfo == null) {
|
||||
throw new OMException("Commit a key without corresponding entry " +
|
||||
DFSUtil.bytes2String(objectKey), ResultCodes.FAILED_KEY_NOT_FOUND);
|
||||
objectKey, ResultCodes.FAILED_KEY_NOT_FOUND);
|
||||
}
|
||||
OmKeyInfo keyInfo =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
|
||||
keyInfo.setDataSize(args.getDataSize());
|
||||
keyInfo.setModificationTime(Time.now());
|
||||
List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
|
||||
|
@ -337,8 +328,10 @@ public class KeyManagerImpl implements KeyManager {
|
|||
|
||||
//update the block length for each block
|
||||
keyInfo.updateLocationInfoList(locationInfoList);
|
||||
metadataManager.getStore().move(openKey, objectKey,
|
||||
keyInfo.getProtobuf().toByteArray(),
|
||||
metadataManager.getStore().move(
|
||||
openKey,
|
||||
objectKey,
|
||||
keyInfo,
|
||||
metadataManager.getOpenKeyTable(),
|
||||
metadataManager.getKeyTable());
|
||||
} catch (OMException e) {
|
||||
|
@ -361,16 +354,16 @@ public class KeyManagerImpl implements KeyManager {
|
|||
String keyName = args.getKeyName();
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
byte[] keyBytes = metadataManager.getOzoneKeyBytes(
|
||||
String keyBytes = metadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName);
|
||||
byte[] value = metadataManager.getKeyTable().get(keyBytes);
|
||||
OmKeyInfo value = metadataManager.getKeyTable().get(keyBytes);
|
||||
if (value == null) {
|
||||
LOG.debug("volume:{} bucket:{} Key:{} not found",
|
||||
volumeName, bucketName, keyName);
|
||||
throw new OMException("Key not found",
|
||||
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
|
||||
}
|
||||
return OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
|
||||
return value;
|
||||
} catch (IOException ex) {
|
||||
LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
|
||||
volumeName, bucketName, keyName, ex);
|
||||
|
@ -398,9 +391,9 @@ public class KeyManagerImpl implements KeyManager {
|
|||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
// fromKeyName should exist
|
||||
byte[] fromKey = metadataManager.getOzoneKeyBytes(
|
||||
String fromKey = metadataManager.getOzoneKey(
|
||||
volumeName, bucketName, fromKeyName);
|
||||
byte[] fromKeyValue = metadataManager.getKeyTable().get(fromKey);
|
||||
OmKeyInfo fromKeyValue = metadataManager.getKeyTable().get(fromKey);
|
||||
if (fromKeyValue == null) {
|
||||
// TODO: Add support for renaming open key
|
||||
LOG.error(
|
||||
|
@ -418,9 +411,9 @@ public class KeyManagerImpl implements KeyManager {
|
|||
}
|
||||
|
||||
// toKeyName should not exist
|
||||
byte[] toKey =
|
||||
metadataManager.getOzoneKeyBytes(volumeName, bucketName, toKeyName);
|
||||
byte[] toKeyValue = metadataManager.getKeyTable().get(toKey);
|
||||
String toKey =
|
||||
metadataManager.getOzoneKey(volumeName, bucketName, toKeyName);
|
||||
OmKeyInfo toKeyValue = metadataManager.getKeyTable().get(toKey);
|
||||
if (toKeyValue != null) {
|
||||
LOG.error(
|
||||
"Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
|
||||
|
@ -430,16 +423,13 @@ public class KeyManagerImpl implements KeyManager {
|
|||
OMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
|
||||
OmKeyInfo newKeyInfo =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue));
|
||||
newKeyInfo.setKeyName(toKeyName);
|
||||
newKeyInfo.updateModifcationTime();
|
||||
fromKeyValue.setKeyName(toKeyName);
|
||||
fromKeyValue.updateModifcationTime();
|
||||
DBStore store = metadataManager.getStore();
|
||||
try (BatchOperation batch = store.initBatchOperation()) {
|
||||
metadataManager.getKeyTable().deleteWithBatch(batch, fromKey);
|
||||
metadataManager.getKeyTable().putWithBatch(batch, toKey,
|
||||
newKeyInfo.getProtobuf().toByteArray());
|
||||
fromKeyValue);
|
||||
store.commitBatchOperation(batch);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
|
@ -460,16 +450,15 @@ public class KeyManagerImpl implements KeyManager {
|
|||
String keyName = args.getKeyName();
|
||||
metadataManager.getLock().acquireBucketLock(volumeName, bucketName);
|
||||
try {
|
||||
byte[] objectKey = metadataManager.getOzoneKeyBytes(
|
||||
String objectKey = metadataManager.getOzoneKey(
|
||||
volumeName, bucketName, keyName);
|
||||
byte[] objectValue = metadataManager.getKeyTable().get(objectKey);
|
||||
if (objectValue == null) {
|
||||
OmKeyInfo keyInfo = metadataManager.getKeyTable().get(objectKey);
|
||||
if (keyInfo == null) {
|
||||
throw new OMException("Key not found",
|
||||
OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
|
||||
} else {
|
||||
// directly delete key with no blocks from db. This key need not be
|
||||
// moved to deleted table.
|
||||
KeyInfo keyInfo = KeyInfo.parseFrom(objectValue);
|
||||
if (isKeyEmpty(keyInfo)) {
|
||||
metadataManager.getKeyTable().delete(objectKey);
|
||||
LOG.debug("Key {} deleted from OM DB", keyName);
|
||||
|
@ -491,9 +480,10 @@ public class KeyManagerImpl implements KeyManager {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isKeyEmpty(KeyInfo keyInfo) {
|
||||
for (KeyLocationList keyLocationList : keyInfo.getKeyLocationListList()) {
|
||||
if (keyLocationList.getKeyLocationsCount() != 0) {
|
||||
private boolean isKeyEmpty(OmKeyInfo keyInfo) {
|
||||
for (OmKeyLocationInfoGroup keyLocationList : keyInfo
|
||||
.getKeyLocationVersions()) {
|
||||
if (keyLocationList.getLocationList().size() != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,27 +20,24 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.hdds.client.BlockID;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.common.BlockGroup;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
|
||||
import org.apache.hadoop.ozone.om.codec.VolumeListCodec;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
|
@ -123,32 +120,32 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Table<byte[], byte[]> getUserTable() {
|
||||
public Table<String, VolumeList> getUserTable() {
|
||||
return userTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table<byte[], byte[]> getVolumeTable() {
|
||||
public Table<String, OmVolumeArgs> getVolumeTable() {
|
||||
return volumeTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table<byte[], byte[]> getBucketTable() {
|
||||
public Table<String, OmBucketInfo> getBucketTable() {
|
||||
return bucketTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table<byte[], byte[]> getKeyTable() {
|
||||
public Table<String, OmKeyInfo> getKeyTable() {
|
||||
return keyTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table<byte[], byte[]> getDeletedTable() {
|
||||
public Table<String, OmKeyInfo> getDeletedTable() {
|
||||
return deletedTable;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table<byte[], byte[]> getOpenKeyTable() {
|
||||
public Table<String, OmKeyInfo> getOpenKeyTable() {
|
||||
return openKeyTable;
|
||||
}
|
||||
|
||||
|
@ -178,6 +175,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
// db, so we need to create the store object and initialize DB.
|
||||
if (store == null) {
|
||||
File metaDir = OmUtils.getOmDbDir(configuration);
|
||||
|
||||
this.store = DBStoreBuilder.newBuilder(configuration)
|
||||
.setName(OM_DB_NAME)
|
||||
.setPath(Paths.get(metaDir.getPath()))
|
||||
|
@ -188,28 +186,39 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
.addTable(DELETED_TABLE)
|
||||
.addTable(OPEN_KEY_TABLE)
|
||||
.addTable(S3_TABLE)
|
||||
.addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
|
||||
.addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
|
||||
.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
|
||||
.addCodec(VolumeList.class, new VolumeListCodec())
|
||||
.build();
|
||||
|
||||
userTable = this.store.getTable(USER_TABLE);
|
||||
userTable =
|
||||
this.store.getTable(USER_TABLE, String.class, VolumeList.class);
|
||||
checkTableStatus(userTable, USER_TABLE);
|
||||
|
||||
volumeTable = this.store.getTable(VOLUME_TABLE);
|
||||
this.store.getTable(VOLUME_TABLE, String.class,
|
||||
String.class);
|
||||
volumeTable =
|
||||
this.store.getTable(VOLUME_TABLE, String.class, OmVolumeArgs.class);
|
||||
checkTableStatus(volumeTable, VOLUME_TABLE);
|
||||
|
||||
bucketTable = this.store.getTable(BUCKET_TABLE);
|
||||
bucketTable =
|
||||
this.store.getTable(BUCKET_TABLE, String.class, OmBucketInfo.class);
|
||||
checkTableStatus(bucketTable, BUCKET_TABLE);
|
||||
|
||||
keyTable = this.store.getTable(KEY_TABLE);
|
||||
keyTable = this.store.getTable(KEY_TABLE, String.class, OmKeyInfo.class);
|
||||
checkTableStatus(keyTable, KEY_TABLE);
|
||||
|
||||
deletedTable = this.store.getTable(DELETED_TABLE);
|
||||
deletedTable =
|
||||
this.store.getTable(DELETED_TABLE, String.class, OmKeyInfo.class);
|
||||
checkTableStatus(deletedTable, DELETED_TABLE);
|
||||
|
||||
openKeyTable = this.store.getTable(OPEN_KEY_TABLE);
|
||||
openKeyTable =
|
||||
this.store.getTable(OPEN_KEY_TABLE, String.class, OmKeyInfo.class);
|
||||
checkTableStatus(openKeyTable, OPEN_KEY_TABLE);
|
||||
|
||||
s3Table = this.store.getTable(S3_TABLE);
|
||||
checkTableStatus(s3Table, S3_TABLE);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,8 +250,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
* @param volume - Volume name
|
||||
*/
|
||||
@Override
|
||||
public byte[] getVolumeKey(String volume) {
|
||||
return DFSUtil.string2Bytes(OzoneConsts.OM_KEY_PREFIX + volume);
|
||||
public String getVolumeKey(String volume) {
|
||||
return OzoneConsts.OM_KEY_PREFIX + volume;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -251,8 +260,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
* @param user - User name
|
||||
*/
|
||||
@Override
|
||||
public byte[] getUserKey(String user) {
|
||||
return DFSUtil.string2Bytes(user);
|
||||
public String getUserKey(String user) {
|
||||
return user;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -262,18 +271,18 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
* @param bucket - Bucket name
|
||||
*/
|
||||
@Override
|
||||
public byte[] getBucketKey(String volume, String bucket) {
|
||||
public String getBucketKey(String volume, String bucket) {
|
||||
StringBuilder builder =
|
||||
new StringBuilder().append(OM_KEY_PREFIX).append(volume);
|
||||
|
||||
if (StringUtils.isNotBlank(bucket)) {
|
||||
builder.append(OM_KEY_PREFIX).append(bucket);
|
||||
}
|
||||
return DFSUtil.string2Bytes(builder.toString());
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getOzoneKeyBytes(String volume, String bucket, String key) {
|
||||
public String getOzoneKey(String volume, String bucket, String key) {
|
||||
StringBuilder builder = new StringBuilder()
|
||||
.append(OM_KEY_PREFIX).append(volume);
|
||||
// TODO : Throw if the Bucket is null?
|
||||
|
@ -281,15 +290,15 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
if (StringUtil.isNotBlank(key)) {
|
||||
builder.append(OM_KEY_PREFIX).append(key);
|
||||
}
|
||||
return DFSUtil.string2Bytes(builder.toString());
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getOpenKeyBytes(String volume, String bucket,
|
||||
public String getOpenKey(String volume, String bucket,
|
||||
String key, long id) {
|
||||
String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket +
|
||||
OM_KEY_PREFIX + key + OM_KEY_PREFIX + id;
|
||||
return DFSUtil.string2Bytes(openKey);
|
||||
return openKey;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -349,11 +358,12 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
*/
|
||||
@Override
|
||||
public boolean isVolumeEmpty(String volume) throws IOException {
|
||||
byte[] volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);
|
||||
try (TableIterator<byte[], Table.KeyValue> bucketIter = bucketTable
|
||||
.iterator()) {
|
||||
Table.KeyValue<byte[], byte[]> kv = bucketIter.seek(volumePrefix);
|
||||
if (kv != null && startsWith(kv.getKey(), volumePrefix)) {
|
||||
String volumePrefix = getVolumeKey(volume + OM_KEY_PREFIX);
|
||||
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
|
||||
bucketIter = bucketTable.iterator()) {
|
||||
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(volumePrefix);
|
||||
if (kv != null && kv.getKey().startsWith(volumePrefix)) {
|
||||
return false; // we found at least one bucket with this volume prefix.
|
||||
}
|
||||
}
|
||||
|
@ -371,10 +381,11 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
@Override
|
||||
public boolean isBucketEmpty(String volume, String bucket)
|
||||
throws IOException {
|
||||
byte[] keyPrefix = getBucketKey(volume, bucket + OM_KEY_PREFIX);
|
||||
try (TableIterator<byte[], Table.KeyValue> keyIter = keyTable.iterator()) {
|
||||
Table.KeyValue<byte[], byte[]> kv = keyIter.seek(keyPrefix);
|
||||
if (kv != null && startsWith(kv.getKey(), keyPrefix)) {
|
||||
String keyPrefix = getBucketKey(volume, bucket);
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
|
||||
keyTable.iterator()) {
|
||||
KeyValue<String, OmKeyInfo> kv = keyIter.seek(keyPrefix);
|
||||
if (kv != null && kv.getKey().startsWith(keyPrefix)) {
|
||||
return false; // we found at least one key with this vol/bucket prefix.
|
||||
}
|
||||
}
|
||||
|
@ -394,14 +405,13 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
||||
byte[] volumeNameBytes = getVolumeKey(volumeName);
|
||||
String volumeNameBytes = getVolumeKey(volumeName);
|
||||
if (volumeTable.get(volumeNameBytes) == null) {
|
||||
throw new OMException("Volume " + volumeName + " not found.",
|
||||
ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
||||
|
||||
byte[] startKey;
|
||||
String startKey;
|
||||
boolean skipStartKey = false;
|
||||
if (StringUtil.isNotBlank(startBucket)) {
|
||||
// if the user has specified a start key, we need to seek to that key
|
||||
|
@ -417,26 +427,26 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
startKey = getBucketKey(volumeName, bucketPrefix);
|
||||
}
|
||||
|
||||
byte[] seekPrefix;
|
||||
String seekPrefix;
|
||||
if (StringUtil.isNotBlank(bucketPrefix)) {
|
||||
seekPrefix = getBucketKey(volumeName, bucketPrefix);
|
||||
} else {
|
||||
seekPrefix = getVolumeKey(volumeName + OM_KEY_PREFIX);
|
||||
}
|
||||
int currentCount = 0;
|
||||
try (TableIterator<byte[], Table.KeyValue> bucketIter = bucketTable
|
||||
.iterator()) {
|
||||
Table.KeyValue<byte[], byte[]> kv = bucketIter.seek(startKey);
|
||||
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmBucketInfo>>
|
||||
bucketIter = bucketTable.iterator()) {
|
||||
KeyValue<String, OmBucketInfo> kv = bucketIter.seek(startKey);
|
||||
while (currentCount < maxNumOfBuckets && bucketIter.hasNext()) {
|
||||
kv = bucketIter.next();
|
||||
// Skip the Start Bucket if needed.
|
||||
if (kv != null && skipStartKey &&
|
||||
Arrays.equals(kv.getKey(), startKey)) {
|
||||
kv.getKey().equals(startKey)) {
|
||||
continue;
|
||||
}
|
||||
if (kv != null && startsWith(kv.getKey(), seekPrefix)) {
|
||||
result.add(OmBucketInfo.getFromProtobuf(
|
||||
BucketInfo.parseFrom(kv.getValue())));
|
||||
if (kv != null && kv.getKey().startsWith(seekPrefix)) {
|
||||
result.add(kv.getValue());
|
||||
currentCount++;
|
||||
} else {
|
||||
// The SeekPrefix does not match any more, we can break out of the
|
||||
|
@ -462,43 +472,42 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||
}
|
||||
|
||||
byte[] bucketNameBytes = getBucketKey(volumeName, bucketName);
|
||||
String bucketNameBytes = getBucketKey(volumeName, bucketName);
|
||||
if (getBucketTable().get(bucketNameBytes) == null) {
|
||||
throw new OMException("Bucket " + bucketName + " not found.",
|
||||
ResultCodes.FAILED_BUCKET_NOT_FOUND);
|
||||
}
|
||||
|
||||
byte[] seekKey;
|
||||
String seekKey;
|
||||
boolean skipStartKey = false;
|
||||
if (StringUtil.isNotBlank(startKey)) {
|
||||
// Seek to the specified key.
|
||||
seekKey = getOzoneKeyBytes(volumeName, bucketName, startKey);
|
||||
seekKey = getOzoneKey(volumeName, bucketName, startKey);
|
||||
skipStartKey = true;
|
||||
} else {
|
||||
// This allows us to seek directly to the first key with the right prefix.
|
||||
seekKey = getOzoneKeyBytes(volumeName, bucketName, keyPrefix);
|
||||
seekKey = getOzoneKey(volumeName, bucketName, keyPrefix);
|
||||
}
|
||||
|
||||
byte[] seekPrefix;
|
||||
String seekPrefix;
|
||||
if (StringUtil.isNotBlank(keyPrefix)) {
|
||||
seekPrefix = getOzoneKeyBytes(volumeName, bucketName, keyPrefix);
|
||||
seekPrefix = getOzoneKey(volumeName, bucketName, keyPrefix);
|
||||
} else {
|
||||
seekPrefix = getBucketKey(volumeName, bucketName + OM_KEY_PREFIX);
|
||||
}
|
||||
int currentCount = 0;
|
||||
try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> keyIter =
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
|
||||
getKeyTable()
|
||||
.iterator()) {
|
||||
Table.KeyValue<byte[], byte[]> kv = keyIter.seek(seekKey);
|
||||
KeyValue<String, OmKeyInfo> kv = keyIter.seek(seekKey);
|
||||
while (currentCount < maxKeys && keyIter.hasNext()) {
|
||||
kv = keyIter.next();
|
||||
// Skip the Start key if needed.
|
||||
if (kv != null && skipStartKey && Arrays.equals(kv.getKey(), seekKey)) {
|
||||
if (kv != null && skipStartKey && kv.getKey().equals(seekKey)) {
|
||||
continue;
|
||||
}
|
||||
if (kv != null && startsWith(kv.getKey(), seekPrefix)) {
|
||||
result.add(OmKeyInfo.getFromProtobuf(
|
||||
KeyInfo.parseFrom(kv.getValue())));
|
||||
if (kv != null && kv.getKey().startsWith(seekPrefix)) {
|
||||
result.add(kv.getValue());
|
||||
currentCount++;
|
||||
} else {
|
||||
// The SeekPrefix does not match any more, we can break out of the
|
||||
|
@ -538,8 +547,9 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
continue;
|
||||
}
|
||||
if (startKeyFound && result.size() < maxKeys) {
|
||||
byte[] volumeInfo = getVolumeTable().get(this.getVolumeKey(volumeName));
|
||||
if (volumeInfo == null) {
|
||||
OmVolumeArgs volumeArgs =
|
||||
getVolumeTable().get(this.getVolumeKey(volumeName));
|
||||
if (volumeArgs == null) {
|
||||
// Could not get volume info by given volume name,
|
||||
// since the volume name is loaded from db,
|
||||
// this probably means om db is corrupted or some entries are
|
||||
|
@ -547,8 +557,6 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
throw new OMException("Volume info not found for " + volumeName,
|
||||
ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
VolumeInfo info = VolumeInfo.parseFrom(volumeInfo);
|
||||
OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(info);
|
||||
result.add(volumeArgs);
|
||||
}
|
||||
}
|
||||
|
@ -556,49 +564,42 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
return result;
|
||||
}
|
||||
|
||||
private VolumeList getVolumesByUser(String userName)
|
||||
private VolumeList getVolumesByUser(String userNameKey)
|
||||
throws OMException {
|
||||
return getVolumesByUser(getUserKey(userName));
|
||||
}
|
||||
|
||||
private VolumeList getVolumesByUser(byte[] userNameKey)
|
||||
throws OMException {
|
||||
VolumeList volumes = null;
|
||||
try {
|
||||
byte[] volumesInBytes = getUserTable().get(userNameKey);
|
||||
if (volumesInBytes == null) {
|
||||
VolumeList volumeList = getUserTable().get(userNameKey);
|
||||
if (volumeList == null) {
|
||||
// No volume found for this user, return an empty list
|
||||
return VolumeList.newBuilder().build();
|
||||
} else {
|
||||
return volumeList;
|
||||
}
|
||||
volumes = VolumeList.parseFrom(volumesInBytes);
|
||||
} catch (IOException e) {
|
||||
throw new OMException("Unable to get volumes info by the given user, "
|
||||
+ "metadata might be corrupted", e,
|
||||
ResultCodes.FAILED_METADATA_ERROR);
|
||||
}
|
||||
return volumes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
|
||||
throws IOException {
|
||||
List<BlockGroup> keyBlocksList = Lists.newArrayList();
|
||||
try (TableIterator<byte[], ? extends KeyValue<byte[], byte[]>> keyIter =
|
||||
try (TableIterator<String, ? extends KeyValue<String, OmKeyInfo>> keyIter =
|
||||
getDeletedTable()
|
||||
.iterator()) {
|
||||
int currentCount = 0;
|
||||
while (keyIter.hasNext() && currentCount < keyCount) {
|
||||
KeyValue<byte[], byte[]> kv = keyIter.next();
|
||||
KeyValue<String, OmKeyInfo> kv = keyIter.next();
|
||||
if (kv != null) {
|
||||
OmKeyInfo info =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(kv.getValue()));
|
||||
OmKeyInfo info = kv.getValue();
|
||||
// Get block keys as a list.
|
||||
OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
|
||||
List<BlockID> item = latest.getLocationList().stream()
|
||||
.map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
|
||||
.collect(Collectors.toList());
|
||||
BlockGroup keyBlocks = BlockGroup.newBuilder()
|
||||
.setKeyName(DFSUtil.bytes2String(kv.getKey()))
|
||||
.setKeyName(kv.getKey())
|
||||
.addAllBlockIDs(item)
|
||||
.build();
|
||||
keyBlocksList.add(keyBlocks);
|
||||
|
@ -614,27 +615,6 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
List<BlockGroup> keyBlocksList = Lists.newArrayList();
|
||||
long now = Time.now();
|
||||
// TODO: Fix the getExpiredOpenKeys, Not part of this patch.
|
||||
List<Map.Entry<byte[], byte[]>> rangeResult = Collections.emptyList();
|
||||
|
||||
for (Map.Entry<byte[], byte[]> entry : rangeResult) {
|
||||
OmKeyInfo info =
|
||||
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
|
||||
long lastModify = info.getModificationTime();
|
||||
if (now - lastModify < this.openKeyExpireThresholdMS) {
|
||||
// consider as may still be active, not hanging.
|
||||
continue;
|
||||
}
|
||||
// Get block keys as a list.
|
||||
List<BlockID> item = info.getLatestVersionLocations()
|
||||
.getBlocksLatestVersionOnly().stream()
|
||||
.map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
|
||||
.collect(Collectors.toList());
|
||||
BlockGroup keyBlocks = BlockGroup.newBuilder()
|
||||
.setKeyName(DFSUtil.bytes2String(entry.getKey()))
|
||||
.addAllBlockIDs(item)
|
||||
.build();
|
||||
keyBlocksList.add(keyBlocks);
|
||||
}
|
||||
return keyBlocksList;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,16 +19,12 @@ package org.apache.hadoop.ozone.om;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.RocksDBStore;
|
||||
import org.apache.hadoop.utils.db.BatchOperation;
|
||||
|
||||
|
@ -66,12 +62,11 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
private void addVolumeToOwnerList(String volume, String owner,
|
||||
BatchOperation batchOperation) throws IOException {
|
||||
// Get the volume list
|
||||
byte[] dbUserKey = metadataManager.getUserKey(owner);
|
||||
byte[] volumeList = metadataManager.getUserTable().get(dbUserKey);
|
||||
String dbUserKey = metadataManager.getUserKey(owner);
|
||||
VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
|
||||
List<String> prevVolList = new ArrayList<>();
|
||||
if (volumeList != null) {
|
||||
VolumeList vlist = VolumeList.parseFrom(volumeList);
|
||||
prevVolList.addAll(vlist.getVolumeNamesList());
|
||||
prevVolList.addAll(volumeList.getVolumeNamesList());
|
||||
}
|
||||
|
||||
// Check the volume count
|
||||
|
@ -85,18 +80,17 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
VolumeList newVolList = VolumeList.newBuilder()
|
||||
.addAllVolumeNames(prevVolList).build();
|
||||
metadataManager.getUserTable().putWithBatch(batchOperation,
|
||||
dbUserKey, newVolList.toByteArray());
|
||||
dbUserKey, newVolList);
|
||||
}
|
||||
|
||||
private void delVolumeFromOwnerList(String volume, String owner,
|
||||
BatchOperation batch) throws RocksDBException, IOException {
|
||||
// Get the volume list
|
||||
byte[] dbUserKey = metadataManager.getUserKey(owner);
|
||||
byte[] volumeList = metadataManager.getUserTable().get(dbUserKey);
|
||||
String dbUserKey = metadataManager.getUserKey(owner);
|
||||
VolumeList volumeList = metadataManager.getUserTable().get(dbUserKey);
|
||||
List<String> prevVolList = new ArrayList<>();
|
||||
if (volumeList != null) {
|
||||
VolumeList vlist = VolumeList.parseFrom(volumeList);
|
||||
prevVolList.addAll(vlist.getVolumeNamesList());
|
||||
prevVolList.addAll(volumeList.getVolumeNamesList());
|
||||
} else {
|
||||
LOG.debug("volume:{} not found for user:{}");
|
||||
throw new OMException(ResultCodes.FAILED_USER_NOT_FOUND);
|
||||
|
@ -110,7 +104,7 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
VolumeList newVolList = VolumeList.newBuilder()
|
||||
.addAllVolumeNames(prevVolList).build();
|
||||
metadataManager.getUserTable().putWithBatch(batch,
|
||||
dbUserKey, newVolList.toByteArray());
|
||||
dbUserKey, newVolList);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,8 +118,9 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
metadataManager.getLock().acquireUserLock(args.getOwnerName());
|
||||
metadataManager.getLock().acquireVolumeLock(args.getVolume());
|
||||
try {
|
||||
byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
|
||||
byte[] volumeInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
String dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
|
||||
OmVolumeArgs volumeInfo =
|
||||
metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
|
||||
// Check of the volume already exists
|
||||
if (volumeInfo != null) {
|
||||
|
@ -136,25 +131,8 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
try (BatchOperation batch = metadataManager.getStore()
|
||||
.initBatchOperation()) {
|
||||
// Write the vol info
|
||||
List<HddsProtos.KeyValue> metadataList = new ArrayList<>();
|
||||
for (Map.Entry<String, String> entry :
|
||||
args.getKeyValueMap().entrySet()) {
|
||||
metadataList.add(HddsProtos.KeyValue.newBuilder()
|
||||
.setKey(entry.getKey()).setValue(entry.getValue()).build());
|
||||
}
|
||||
List<OzoneAclInfo> aclList = args.getAclMap().ozoneAclGetProtobuf();
|
||||
|
||||
VolumeInfo newVolumeInfo = VolumeInfo.newBuilder()
|
||||
.setAdminName(args.getAdminName())
|
||||
.setOwnerName(args.getOwnerName())
|
||||
.setVolume(args.getVolume())
|
||||
.setQuotaInBytes(args.getQuotaInBytes())
|
||||
.addAllMetadata(metadataList)
|
||||
.addAllVolumeAcls(aclList)
|
||||
.setCreationTime(Time.now())
|
||||
.build();
|
||||
metadataManager.getVolumeTable().putWithBatch(batch,
|
||||
dbVolumeKey, newVolumeInfo.toByteArray());
|
||||
dbVolumeKey, args);
|
||||
|
||||
// Add volume to user list
|
||||
addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
|
||||
|
@ -189,17 +167,16 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
metadataManager.getLock().acquireUserLock(owner);
|
||||
metadataManager.getLock().acquireVolumeLock(volume);
|
||||
try {
|
||||
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volInfo == null) {
|
||||
String dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
OmVolumeArgs volumeArgs = metadataManager
|
||||
.getVolumeTable().get(dbVolumeKey);
|
||||
if (volumeArgs == null) {
|
||||
LOG.debug("Changing volume ownership failed for user:{} volume:{}",
|
||||
owner, volume);
|
||||
throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
||||
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
||||
OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
|
||||
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
|
||||
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
|
||||
|
||||
try (BatchOperation batch = metadataManager.getStore()
|
||||
.initBatchOperation()) {
|
||||
|
@ -214,9 +191,8 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
.setCreationTime(volumeArgs.getCreationTime())
|
||||
.build();
|
||||
|
||||
VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
|
||||
metadataManager.getVolumeTable().putWithBatch(batch,
|
||||
dbVolumeKey, newVolumeInfo.toByteArray());
|
||||
dbVolumeKey, newVolumeArgs);
|
||||
metadataManager.getStore().commitBatchOperation(batch);
|
||||
}
|
||||
} catch (RocksDBException | IOException ex) {
|
||||
|
@ -248,16 +224,15 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
Preconditions.checkNotNull(volume);
|
||||
metadataManager.getLock().acquireVolumeLock(volume);
|
||||
try {
|
||||
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volInfo == null) {
|
||||
String dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
OmVolumeArgs volumeArgs =
|
||||
metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volumeArgs == null) {
|
||||
LOG.debug("volume:{} does not exist", volume);
|
||||
throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
||||
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
||||
OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
|
||||
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
|
||||
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
|
||||
|
||||
OmVolumeArgs newVolumeArgs =
|
||||
OmVolumeArgs.newBuilder()
|
||||
|
@ -267,9 +242,7 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
.setQuotaInBytes(quota)
|
||||
.setCreationTime(volumeArgs.getCreationTime()).build();
|
||||
|
||||
VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
|
||||
metadataManager.getVolumeTable().put(dbVolumeKey,
|
||||
newVolumeInfo.toByteArray());
|
||||
metadataManager.getVolumeTable().put(dbVolumeKey, newVolumeArgs);
|
||||
} catch (IOException ex) {
|
||||
if (!(ex instanceof OMException)) {
|
||||
LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
|
||||
|
@ -292,16 +265,14 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
Preconditions.checkNotNull(volume);
|
||||
metadataManager.getLock().acquireVolumeLock(volume);
|
||||
try {
|
||||
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volInfo == null) {
|
||||
String dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
OmVolumeArgs volumeArgs =
|
||||
metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volumeArgs == null) {
|
||||
LOG.debug("volume:{} does not exist", volume);
|
||||
throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
||||
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
||||
OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
|
||||
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
|
||||
return volumeArgs;
|
||||
} catch (IOException ex) {
|
||||
if (!(ex instanceof OMException)) {
|
||||
|
@ -333,9 +304,10 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
metadataManager.getLock().acquireVolumeLock(volume);
|
||||
try {
|
||||
|
||||
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volInfo == null) {
|
||||
String dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
OmVolumeArgs volumeArgs =
|
||||
metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volumeArgs == null) {
|
||||
LOG.debug("volume:{} does not exist", volume);
|
||||
throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
@ -344,14 +316,12 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
LOG.debug("volume:{} is not empty", volume);
|
||||
throw new OMException(ResultCodes.FAILED_VOLUME_NOT_EMPTY);
|
||||
}
|
||||
|
||||
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
||||
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
|
||||
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
|
||||
// delete the volume from the owner list
|
||||
// as well as delete the volume entry
|
||||
try (BatchOperation batch = metadataManager.getStore()
|
||||
.initBatchOperation()) {
|
||||
delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
|
||||
delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
|
||||
metadataManager.getVolumeTable().deleteWithBatch(batch, dbVolumeKey);
|
||||
metadataManager.getStore().commitBatchOperation(batch);
|
||||
}
|
||||
|
@ -386,16 +356,15 @@ public class VolumeManagerImpl implements VolumeManager {
|
|||
Preconditions.checkNotNull(userAcl);
|
||||
metadataManager.getLock().acquireVolumeLock(volume);
|
||||
try {
|
||||
byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
byte[] volInfo = metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volInfo == null) {
|
||||
String dbVolumeKey = metadataManager.getVolumeKey(volume);
|
||||
OmVolumeArgs volumeArgs =
|
||||
metadataManager.getVolumeTable().get(dbVolumeKey);
|
||||
if (volumeArgs == null) {
|
||||
LOG.debug("volume:{} does not exist", volume);
|
||||
throw new OMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
|
||||
}
|
||||
|
||||
VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
|
||||
OmVolumeArgs volumeArgs = OmVolumeArgs.getFromProtobuf(volumeInfo);
|
||||
Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
|
||||
Preconditions.checkState(volume.equals(volumeArgs.getVolume()));
|
||||
return volumeArgs.getAclMap().hasAccess(userAcl);
|
||||
} catch (IOException ex) {
|
||||
if (!(ex instanceof OMException)) {
|
||||
|
|
|
@ -16,15 +16,24 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
import org.apache.hadoop.hdds.server.ServerUtils;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OzoneAcl;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -33,11 +42,6 @@ import org.junit.rules.TemporaryFolder;
|
|||
import org.junit.runner.RunWith;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Tests BucketManagerImpl, mocks OMMetadataManager for testing.
|
||||
*/
|
||||
|
@ -62,10 +66,16 @@ public class TestBucketManagerImpl {
|
|||
private OmMetadataManagerImpl createSampleVol() throws IOException {
|
||||
OzoneConfiguration conf = createNewTestPath();
|
||||
OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
|
||||
byte[] volumeKey = metaMgr.getVolumeKey("sampleVol");
|
||||
String volumeKey = metaMgr.getVolumeKey("sampleVol");
|
||||
// This is a simple hack for testing, we just test if the volume via a
|
||||
// null check, do not parse the value part. So just write some dummy value.
|
||||
metaMgr.getVolumeTable().put(volumeKey, volumeKey);
|
||||
OmVolumeArgs args =
|
||||
OmVolumeArgs.newBuilder()
|
||||
.setVolume("sampleVol")
|
||||
.setAdminName("bilbo")
|
||||
.setOwnerName("bilbo")
|
||||
.build();
|
||||
metaMgr.getVolumeTable().put(volumeKey, args);
|
||||
return metaMgr;
|
||||
}
|
||||
|
||||
|
@ -344,12 +354,22 @@ public class TestBucketManagerImpl {
|
|||
.build();
|
||||
bucketManager.createBucket(bucketInfo);
|
||||
//Create keys in bucket
|
||||
metaMgr.getKeyTable().put(DFSUtil.string2Bytes("/sampleVol/bucketOne" +
|
||||
"/key_one"),
|
||||
DFSUtil.string2Bytes("value_one"));
|
||||
metaMgr.getKeyTable().put(DFSUtil.string2Bytes("/sampleVol/bucketOne" +
|
||||
"/key_two"),
|
||||
DFSUtil.string2Bytes("value_two"));
|
||||
metaMgr.getKeyTable().put("/sampleVol/bucketOne/key_one",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_one")
|
||||
.setReplicationFactor(ReplicationFactor.ONE)
|
||||
.setReplicationType(ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
metaMgr.getKeyTable().put("/sampleVol/bucketOne/key_two",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_two")
|
||||
.setReplicationFactor(ReplicationFactor.ONE)
|
||||
.setReplicationType(ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
try {
|
||||
bucketManager.deleteBucket("sampleVol", "bucketOne");
|
||||
} catch (OMException omEx) {
|
||||
|
|
|
@ -19,18 +19,6 @@
|
|||
|
||||
package org.apache.hadoop.ozone.om;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.server.ServerUtils;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.utils.db.DBConfigFromFile;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -38,10 +26,22 @@ import java.util.UUID;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys
|
||||
.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.server.ServerUtils;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.utils.db.DBConfigFromFile;
|
||||
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
/**
|
||||
* Test Key Deleting Service.
|
||||
|
@ -166,18 +166,25 @@ public class TestKeyDeletingService {
|
|||
RandomStringUtils.randomAlphanumeric(5));
|
||||
String keyName = String.format("key%s",
|
||||
RandomStringUtils.randomAlphanumeric(5));
|
||||
byte[] volumeBytes =
|
||||
String volumeBytes =
|
||||
keyManager.getMetadataManager().getVolumeKey(volumeName);
|
||||
byte[] bucketBytes =
|
||||
String bucketBytes =
|
||||
keyManager.getMetadataManager().getBucketKey(volumeName, bucketName);
|
||||
// cheat here, just create a volume and bucket entry so that we can
|
||||
// create the keys, we put the same data for key and value since the
|
||||
// system does not decode the object
|
||||
keyManager.getMetadataManager().getVolumeTable().put(volumeBytes,
|
||||
volumeBytes);
|
||||
OmVolumeArgs.newBuilder()
|
||||
.setOwnerName("o")
|
||||
.setAdminName("a")
|
||||
.setVolume(volumeName)
|
||||
.build());
|
||||
|
||||
keyManager.getMetadataManager().getBucketTable().put(bucketBytes,
|
||||
bucketBytes);
|
||||
OmBucketInfo.newBuilder()
|
||||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.build());
|
||||
|
||||
OmKeyArgs arg =
|
||||
new OmKeyArgs.Builder()
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.apache.hadoop.ozone.om;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
|
||||
|
@ -29,14 +29,22 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
|
|||
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
|
||||
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
|
||||
import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
|
||||
import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
|
||||
import org.apache.hadoop.ozone.om.exceptions.OMException;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.db.CodecRegistry;
|
||||
import org.apache.hadoop.utils.db.RDBStore;
|
||||
import org.apache.hadoop.utils.db.Table;
|
||||
import org.apache.hadoop.utils.db.TableConfig;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -48,8 +56,6 @@ import org.rocksdb.RocksDB;
|
|||
import org.rocksdb.Statistics;
|
||||
import org.rocksdb.StatsLevel;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
|
||||
/**
|
||||
* Test class for @{@link KeyManagerImpl}.
|
||||
* */
|
||||
|
@ -64,7 +70,9 @@ public class TestKeyManagerImpl {
|
|||
private static final String BUCKET_NAME = "bucket1";
|
||||
private static final String VOLUME_NAME = "vol1";
|
||||
private static RDBStore rdbStore = null;
|
||||
private static Table rdbTable = null;
|
||||
private static Table<String, OmKeyInfo> keyTable = null;
|
||||
private static Table<String, OmBucketInfo> bucketTable = null;
|
||||
private static Table<String, OmVolumeArgs> volumeTable = null;
|
||||
private static DBOptions options = null;
|
||||
private KeyInfo keyData;
|
||||
@Rule
|
||||
|
@ -88,17 +96,17 @@ public class TestKeyManagerImpl {
|
|||
new SCMException("ChillModePrecheck failed for allocateBlock",
|
||||
ResultCodes.CHILL_MODE_EXCEPTION));
|
||||
setupRocksDb();
|
||||
Mockito.when(metadataManager.getVolumeTable()).thenReturn(rdbTable);
|
||||
Mockito.when(metadataManager.getBucketTable()).thenReturn(rdbTable);
|
||||
Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(rdbTable);
|
||||
Mockito.when(metadataManager.getVolumeTable()).thenReturn(volumeTable);
|
||||
Mockito.when(metadataManager.getBucketTable()).thenReturn(bucketTable);
|
||||
Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(keyTable);
|
||||
Mockito.when(metadataManager.getLock())
|
||||
.thenReturn(new OzoneManagerLock(conf));
|
||||
Mockito.when(metadataManager.getVolumeKey(VOLUME_NAME))
|
||||
.thenReturn(VOLUME_NAME.getBytes(UTF_8));
|
||||
.thenReturn(VOLUME_NAME);
|
||||
Mockito.when(metadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME))
|
||||
.thenReturn(BUCKET_NAME.getBytes(UTF_8));
|
||||
Mockito.when(metadataManager.getOpenKeyBytes(VOLUME_NAME, BUCKET_NAME,
|
||||
KEY_NAME, 1)).thenReturn(KEY_NAME.getBytes(UTF_8));
|
||||
.thenReturn(BUCKET_NAME);
|
||||
Mockito.when(metadataManager.getOpenKey(VOLUME_NAME, BUCKET_NAME,
|
||||
KEY_NAME, 1)).thenReturn(KEY_NAME);
|
||||
}
|
||||
|
||||
private void setupRocksDb() throws Exception {
|
||||
|
@ -113,7 +121,7 @@ public class TestKeyManagerImpl {
|
|||
Set<TableConfig> configSet = new HashSet<>();
|
||||
for (String name : Arrays
|
||||
.asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
|
||||
"testTable")) {
|
||||
"testKeyTable", "testBucketTable", "testVolumeTable")) {
|
||||
TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
|
||||
configSet.add(newConfig);
|
||||
}
|
||||
|
@ -128,13 +136,39 @@ public class TestKeyManagerImpl {
|
|||
.setModificationTime(Time.now())
|
||||
.build();
|
||||
|
||||
rdbStore = new RDBStore(folder.newFolder(), options, configSet);
|
||||
rdbTable = rdbStore.getTable("testTable");
|
||||
rdbTable.put(VOLUME_NAME.getBytes(UTF_8),
|
||||
RandomStringUtils.random(10).getBytes(UTF_8));
|
||||
rdbTable.put(BUCKET_NAME.getBytes(UTF_8),
|
||||
RandomStringUtils.random(10).getBytes(UTF_8));
|
||||
rdbTable.put(KEY_NAME.getBytes(UTF_8), keyData.toByteArray());
|
||||
CodecRegistry registry = new CodecRegistry();
|
||||
registry.addCodec(OmKeyInfo.class, new OmKeyInfoCodec());
|
||||
registry.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec());
|
||||
registry.addCodec(OmBucketInfo.class, new OmBucketInfoCodec());
|
||||
rdbStore = new RDBStore(folder.newFolder(), options, configSet, registry);
|
||||
|
||||
keyTable =
|
||||
rdbStore.getTable("testKeyTable", String.class, OmKeyInfo.class);
|
||||
|
||||
bucketTable =
|
||||
rdbStore.getTable("testBucketTable", String.class, OmBucketInfo.class);
|
||||
|
||||
volumeTable =
|
||||
rdbStore.getTable("testVolumeTable", String.class, OmVolumeArgs.class);
|
||||
|
||||
volumeTable.put(VOLUME_NAME, OmVolumeArgs.newBuilder()
|
||||
.setAdminName("a")
|
||||
.setOwnerName("o")
|
||||
.setVolume(VOLUME_NAME)
|
||||
.build());
|
||||
|
||||
bucketTable.put(BUCKET_NAME,
|
||||
new OmBucketInfo.Builder().setBucketName(BUCKET_NAME)
|
||||
.setVolumeName(VOLUME_NAME).build());
|
||||
|
||||
keyTable.put(KEY_NAME, new OmKeyInfo.Builder()
|
||||
.setVolumeName(VOLUME_NAME)
|
||||
.setBucketName(BUCKET_NAME)
|
||||
.setKeyName(KEY_NAME)
|
||||
.setReplicationType(ReplicationType.STAND_ALONE)
|
||||
.setReplicationFactor(ReplicationFactor.THREE)
|
||||
.build());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue