HDDS-1233. Create an Ozone Manager Service provider for Recon. Contributed by Aravindan Vijayan.
This commit is contained in:
parent
506502bb83
commit
60cdd4cac1
|
@ -57,6 +57,7 @@ public final class DBStoreBuilder {
|
|||
private List<String> tableNames;
|
||||
private Configuration configuration;
|
||||
private CodecRegistry registry;
|
||||
private boolean readOnly = false;
|
||||
|
||||
private DBStoreBuilder(Configuration configuration) {
|
||||
tables = new HashSet<>();
|
||||
|
@ -113,6 +114,11 @@ public final class DBStoreBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
public DBStoreBuilder setReadOnly(boolean rdOnly) {
|
||||
readOnly = rdOnly;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a DBStore instance and returns that.
|
||||
*
|
||||
|
@ -131,7 +137,7 @@ public final class DBStoreBuilder {
|
|||
if (!dbFile.getParentFile().exists()) {
|
||||
throw new IOException("The DB destination directory should exist.");
|
||||
}
|
||||
return new RDBStore(dbFile, options, tables, registry);
|
||||
return new RDBStore(dbFile, options, tables, registry, readOnly);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,13 +19,11 @@
|
|||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.rocksdb.Checkpoint;
|
||||
import org.rocksdb.RocksDB;
|
||||
|
@ -99,48 +97,4 @@ public class RDBCheckpointManager {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
static class RocksDBCheckpoint implements DBCheckpoint {
|
||||
|
||||
private Path checkpointLocation;
|
||||
private long checkpointTimestamp;
|
||||
private long latestSequenceNumber;
|
||||
private long checkpointCreationTimeTaken;
|
||||
|
||||
RocksDBCheckpoint(Path checkpointLocation,
|
||||
long snapshotTimestamp,
|
||||
long latestSequenceNumber,
|
||||
long checkpointCreationTimeTaken) {
|
||||
this.checkpointLocation = checkpointLocation;
|
||||
this.checkpointTimestamp = snapshotTimestamp;
|
||||
this.latestSequenceNumber = latestSequenceNumber;
|
||||
this.checkpointCreationTimeTaken = checkpointCreationTimeTaken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getCheckpointLocation() {
|
||||
return this.checkpointLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCheckpointTimestamp() {
|
||||
return this.checkpointTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLatestSequenceNumber() {
|
||||
return this.latestSequenceNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long checkpointCreationTimeTaken() {
|
||||
return checkpointCreationTimeTaken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupCheckpoint() throws IOException {
|
||||
LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
|
||||
FileUtils.deleteDirectory(checkpointLocation.toFile());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,24 +55,24 @@ import org.slf4j.LoggerFactory;
|
|||
public class RDBStore implements DBStore {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RDBStore.class);
|
||||
private final RocksDB db;
|
||||
private final File dbLocation;
|
||||
private RocksDB db;
|
||||
private File dbLocation;
|
||||
private final WriteOptions writeOptions;
|
||||
private final DBOptions dbOptions;
|
||||
private final CodecRegistry codecRegistry;
|
||||
private final Hashtable<String, ColumnFamilyHandle> handleTable;
|
||||
private ObjectName statMBeanName;
|
||||
private RDBCheckpointManager checkPointManager;
|
||||
private final String checkpointsParentDir;
|
||||
private String checkpointsParentDir;
|
||||
|
||||
@VisibleForTesting
|
||||
public RDBStore(File dbFile, DBOptions options,
|
||||
Set<TableConfig> families) throws IOException {
|
||||
this(dbFile, options, families, new CodecRegistry());
|
||||
this(dbFile, options, families, new CodecRegistry(), false);
|
||||
}
|
||||
|
||||
public RDBStore(File dbFile, DBOptions options, Set<TableConfig> families,
|
||||
CodecRegistry registry)
|
||||
CodecRegistry registry, boolean readOnly)
|
||||
throws IOException {
|
||||
Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
|
||||
Preconditions.checkNotNull(families);
|
||||
|
@ -93,8 +93,13 @@ public class RDBStore implements DBStore {
|
|||
writeOptions = new WriteOptions();
|
||||
|
||||
try {
|
||||
if (readOnly) {
|
||||
db = RocksDB.openReadOnly(dbOptions, dbLocation.getAbsolutePath(),
|
||||
columnFamilyDescriptors, columnFamilyHandles);
|
||||
} else {
|
||||
db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath(),
|
||||
columnFamilyDescriptors, columnFamilyHandles);
|
||||
}
|
||||
|
||||
for (int x = 0; x < columnFamilyHandles.size(); x++) {
|
||||
handleTable.put(
|
||||
|
@ -278,4 +283,10 @@ public class RDBStore implements DBStore {
|
|||
return checkPointManager.createCheckpoint(checkpointsParentDir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current DB Location.
|
||||
*/
|
||||
public File getDbLocation() {
|
||||
return dbLocation;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.utils.db;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class to hold information and location of a RocksDB Checkpoint.
|
||||
*/
|
||||
public class RocksDBCheckpoint implements DBCheckpoint {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(RocksDBCheckpoint.class);
|
||||
|
||||
private Path checkpointLocation;
|
||||
private long checkpointTimestamp = System.currentTimeMillis();
|
||||
private long latestSequenceNumber = -1;
|
||||
private long checkpointCreationTimeTaken = 0L;
|
||||
|
||||
public RocksDBCheckpoint(Path checkpointLocation) {
|
||||
this.checkpointLocation = checkpointLocation;
|
||||
}
|
||||
|
||||
public RocksDBCheckpoint(Path checkpointLocation,
|
||||
long snapshotTimestamp,
|
||||
long latestSequenceNumber,
|
||||
long checkpointCreationTimeTaken) {
|
||||
this.checkpointLocation = checkpointLocation;
|
||||
this.checkpointTimestamp = snapshotTimestamp;
|
||||
this.latestSequenceNumber = latestSequenceNumber;
|
||||
this.checkpointCreationTimeTaken = checkpointCreationTimeTaken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getCheckpointLocation() {
|
||||
return this.checkpointLocation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getCheckpointTimestamp() {
|
||||
return this.checkpointTimestamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLatestSequenceNumber() {
|
||||
return this.latestSequenceNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long checkpointCreationTimeTaken() {
|
||||
return checkpointCreationTimeTaken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanupCheckpoint() throws IOException {
|
||||
LOG.debug("Cleaning up checkpoint at " + checkpointLocation.toString());
|
||||
FileUtils.deleteDirectory(checkpointLocation.toFile());
|
||||
}
|
||||
}
|
|
@ -2258,7 +2258,7 @@
|
|||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.recon.db.dirs</name>
|
||||
<name>ozone.recon.db.dir</name>
|
||||
<value/>
|
||||
<tag>OZONE, RECON, STORAGE, PERFORMANCE</tag>
|
||||
<description>
|
||||
|
@ -2273,7 +2273,88 @@
|
|||
<name>ozone.scm.network.topology.schema.file</name>
|
||||
<value>network-topology-default.xm</value>
|
||||
<tag>OZONE, MANAGEMENT</tag>
|
||||
<description>The schema file defines the ozone network topology.
|
||||
<description>
|
||||
The schema file defines the ozone network topology
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.recon.container.db.impl</name>
|
||||
<value>RocksDB</value>
|
||||
<tag>OZONE, RECON, STORAGE</tag>
|
||||
<description>
|
||||
Ozone Recon container DB store implementation.Supported value is either
|
||||
LevelDB or RocksDB.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>ozone.recon.om.db.dir</name>
|
||||
<value/>
|
||||
<tag>OZONE, RECON, STORAGE</tag>
|
||||
<description>
|
||||
Directory where the Recon Server stores its OM snapshot DB. This should
|
||||
be specified as a single directory. If the directory does not
|
||||
exist then the Recon will attempt to create it.
|
||||
If undefined, then the Recon will log a warning and fallback to
|
||||
ozone.metadata.dirs.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.connection.request.timeout</name>
|
||||
<value>5000</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Connection request timeout in milliseconds for HTTP call made by Recon to
|
||||
request OM DB snapshot.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.connection.timeout</name>
|
||||
<value>5s</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Connection timeout for HTTP call in milliseconds made by Recon to request
|
||||
OM snapshot.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.socket.timeout</name>
|
||||
<value>5s</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Socket timeout in milliseconds for HTTP call made by Recon to request
|
||||
OM snapshot.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.socket.timeout</name>
|
||||
<value>5s</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Socket timeout for HTTP call made by Recon to request OM snapshot.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.snapshot.task.initial.delay</name>
|
||||
<value>1m</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Initial delay in MINUTES by Recon to request OM DB Snapshot.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.snapshot.task.interval.delay</name>
|
||||
<value>10m</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Interval in MINUTES by Recon to request OM DB Snapshot.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>recon.om.snapshot.task.flush.param</name>
|
||||
<value>false</value>
|
||||
<tag>OZONE, RECON, OM</tag>
|
||||
<description>
|
||||
Request to flush the OM DB before taking checkpoint snapshot.
|
||||
</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.utils.db;
|
|||
|
||||
import javax.management.MBeanServer;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -289,4 +290,46 @@ public class TestRDBStore {
|
|||
checkpoint.getCheckpointLocation()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadOnlyRocksDB() throws Exception {
|
||||
File dbFile = folder.newFolder();
|
||||
byte[] key = "Key1".getBytes();
|
||||
byte[] value = "Value1".getBytes();
|
||||
|
||||
//Create Rdb and write some data into it.
|
||||
RDBStore newStore = new RDBStore(dbFile, options, configSet);
|
||||
Assert.assertNotNull("DB Store cannot be null", newStore);
|
||||
Table firstTable = newStore.getTable(families.get(0));
|
||||
Assert.assertNotNull("Table cannot be null", firstTable);
|
||||
firstTable.put(key, value);
|
||||
|
||||
RocksDBCheckpoint checkpoint = (RocksDBCheckpoint) newStore.getCheckpoint(
|
||||
true);
|
||||
|
||||
//Create Read Only DB from snapshot of first DB.
|
||||
RDBStore snapshotStore = new RDBStore(checkpoint.getCheckpointLocation()
|
||||
.toFile(), options, configSet, new CodecRegistry(), true);
|
||||
|
||||
Assert.assertNotNull("DB Store cannot be null", newStore);
|
||||
|
||||
//Verify read is allowed.
|
||||
firstTable = snapshotStore.getTable(families.get(0));
|
||||
Assert.assertNotNull("Table cannot be null", firstTable);
|
||||
Assert.assertTrue(Arrays.equals(((byte[])firstTable.get(key)), value));
|
||||
|
||||
//Verify write is not allowed.
|
||||
byte[] key2 =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] value2 =
|
||||
RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8);
|
||||
try {
|
||||
firstTable.put(key2, value2);
|
||||
Assert.fail();
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.contains("Not supported operation in read only mode"));
|
||||
}
|
||||
checkpoint.cleanupCheckpoint();
|
||||
}
|
||||
}
|
|
@ -125,8 +125,7 @@ public final class ServerUtils {
|
|||
* @return
|
||||
*/
|
||||
public static File getScmDbDir(Configuration conf) {
|
||||
|
||||
File metadataDir = getDirWithFallBackToOzoneMetadata(conf, ScmConfigKeys
|
||||
File metadataDir = getDirectoryFromConfig(conf, ScmConfigKeys
|
||||
.OZONE_SCM_DB_DIRS, "SCM");
|
||||
if (metadataDir != null) {
|
||||
return metadataDir;
|
||||
|
@ -138,7 +137,15 @@ public final class ServerUtils {
|
|||
return getOzoneMetaDirPath(conf);
|
||||
}
|
||||
|
||||
public static File getDirWithFallBackToOzoneMetadata(Configuration conf,
|
||||
/**
|
||||
* Utility method to get value of a given key that corresponds to a DB
|
||||
* directory.
|
||||
* @param conf configuration bag
|
||||
* @param key Key to test
|
||||
* @param componentName Which component's key is this
|
||||
* @return File created from the value of the key in conf.
|
||||
*/
|
||||
public static File getDirectoryFromConfig(Configuration conf,
|
||||
String key,
|
||||
String componentName) {
|
||||
final Collection<String> metadirs = conf.getTrimmedStringCollection(key);
|
||||
|
@ -155,7 +162,7 @@ public final class ServerUtils {
|
|||
if (!dbDirPath.exists() && !dbDirPath.mkdirs()) {
|
||||
throw new IllegalArgumentException("Unable to create directory " +
|
||||
dbDirPath + " specified in configuration setting " +
|
||||
ScmConfigKeys.OZONE_SCM_DB_DIRS);
|
||||
componentName);
|
||||
}
|
||||
return dbDirPath;
|
||||
}
|
||||
|
|
|
@ -132,6 +132,15 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
start(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* For subclass overriding.
|
||||
*/
|
||||
protected OmMetadataManagerImpl() {
|
||||
this.lock = new OzoneManagerLock(new OzoneConfiguration());
|
||||
this.openKeyExpireThresholdMS =
|
||||
OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS_DEFAULT;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table<String, VolumeList> getUserTable() {
|
||||
return userTable;
|
||||
|
@ -198,10 +207,17 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
if (store == null) {
|
||||
File metaDir = OmUtils.getOmDbDir(configuration);
|
||||
|
||||
this.store = DBStoreBuilder.newBuilder(configuration)
|
||||
DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(configuration)
|
||||
.setName(OM_DB_NAME)
|
||||
.setPath(Paths.get(metaDir.getPath()))
|
||||
.addTable(USER_TABLE)
|
||||
.setPath(Paths.get(metaDir.getPath()));
|
||||
this.store = addOMTablesAndCodecs(dbStoreBuilder).build();
|
||||
initializeOmTables();
|
||||
}
|
||||
}
|
||||
|
||||
protected DBStoreBuilder addOMTablesAndCodecs(DBStoreBuilder builder) {
|
||||
|
||||
return builder.addTable(USER_TABLE)
|
||||
.addTable(VOLUME_TABLE)
|
||||
.addTable(BUCKET_TABLE)
|
||||
.addTable(KEY_TABLE)
|
||||
|
@ -209,16 +225,22 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
.addTable(OPEN_KEY_TABLE)
|
||||
.addTable(S3_TABLE)
|
||||
.addTable(MULTIPARTINFO_TABLE)
|
||||
.addTable(S3_SECRET_TABLE)
|
||||
.addTable(DELEGATION_TOKEN_TABLE)
|
||||
.addTable(S3_SECRET_TABLE)
|
||||
.addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
|
||||
.addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
|
||||
.addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
|
||||
.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
|
||||
.addCodec(VolumeList.class, new VolumeListCodec())
|
||||
.addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec())
|
||||
.build();
|
||||
.addCodec(OmMultipartKeyInfo.class, new OmMultipartKeyInfoCodec());
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize OM Tables.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
protected void initializeOmTables() throws IOException {
|
||||
userTable =
|
||||
this.store.getTable(USER_TABLE, String.class, VolumeList.class);
|
||||
checkTableStatus(userTable, USER_TABLE);
|
||||
|
@ -246,18 +268,17 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
s3Table = this.store.getTable(S3_TABLE);
|
||||
checkTableStatus(s3Table, S3_TABLE);
|
||||
|
||||
dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
|
||||
OzoneTokenIdentifier.class, Long.class);
|
||||
checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
|
||||
|
||||
multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
|
||||
String.class, OmMultipartKeyInfo.class);
|
||||
checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);
|
||||
|
||||
dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
|
||||
OzoneTokenIdentifier.class, Long.class);
|
||||
checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
|
||||
|
||||
s3SecretTable = this.store.getTable(S3_SECRET_TABLE);
|
||||
checkTableStatus(s3SecretTable, S3_SECRET_TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop metadata manager.
|
||||
|
@ -683,4 +704,14 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
|
|||
public Table<byte[], byte[]> getS3SecretTable() {
|
||||
return s3SecretTable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update store used by subclass.
|
||||
*
|
||||
* @param store DB store.
|
||||
*/
|
||||
protected void setStore(DBStore store) {
|
||||
this.store = store;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,10 @@
|
|||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-ozone-common</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-ozone-ozone-manager</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.inject</groupId>
|
||||
<artifactId>guice</artifactId>
|
||||
|
@ -57,9 +61,27 @@
|
|||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<version>1.10.19</version>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>2.8.9</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.powermock</groupId>
|
||||
<artifactId>powermock-module-junit4</artifactId>
|
||||
<version>1.7.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.powermock</groupId>
|
||||
<artifactId>powermock-api-mockito2</artifactId>
|
||||
<version>1.7.4</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -31,4 +31,7 @@ public final class ReconConstants {
|
|||
|
||||
public static final String RECON_CONTAINER_DB = "recon-" +
|
||||
CONTAINER_DB_SUFFIX;
|
||||
|
||||
public static final String RECON_OM_SNAPSHOT_DB =
|
||||
"om.snapshot.db";
|
||||
}
|
||||
|
|
|
@ -18,9 +18,13 @@
|
|||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.ReconContainerDBProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.ContainerDBServiceProvider;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.ContainerDBServiceProviderImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
|
@ -34,10 +38,14 @@ public class ReconControllerModule extends AbstractModule {
|
|||
protected void configure() {
|
||||
bind(OzoneConfiguration.class).toProvider(OzoneConfigurationProvider.class);
|
||||
bind(ReconHttpServer.class).in(Singleton.class);
|
||||
bind(MetadataStore.class).toProvider(ReconContainerDBProvider.class);
|
||||
bind(MetadataStore.class)
|
||||
.toProvider(ReconContainerDBProvider.class).in(Singleton.class);
|
||||
bind(ReconOMMetadataManager.class)
|
||||
.to(ReconOmMetadataManagerImpl.class).in(Singleton.class);
|
||||
bind(ContainerDBServiceProvider.class)
|
||||
.to(ContainerDBServiceProviderImpl.class);
|
||||
.to(ContainerDBServiceProviderImpl.class).in(Singleton.class);
|
||||
bind(OzoneManagerServiceProvider.class)
|
||||
.to(OzoneManagerServiceProviderImpl.class).in(Singleton.class);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
|
@ -50,7 +52,40 @@ public final class ReconServerConfigKeys {
|
|||
"ozone.recon.container.db.cache.size.mb";
|
||||
public static final int OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT = 128;
|
||||
|
||||
public static final String OZONE_RECON_DB_DIRS = "ozone.recon.db.dirs";
|
||||
public static final String OZONE_RECON_DB_DIR = "ozone.recon.db.dir";
|
||||
|
||||
public static final String OZONE_RECON_OM_SNAPSHOT_DB_DIR =
|
||||
"ozone.recon.om.db.dir";
|
||||
|
||||
public static final String RECON_OM_SOCKET_TIMEOUT =
|
||||
"recon.om.socket.timeout";
|
||||
public static final String RECON_OM_SOCKET_TIMEOUT_DEFAULT = "5s";
|
||||
|
||||
public static final String RECON_OM_CONNECTION_TIMEOUT =
|
||||
"recon.om.connection.timeout";
|
||||
public static final String RECON_OM_CONNECTION_TIMEOUT_DEFAULT = "5s";
|
||||
|
||||
public static final String RECON_OM_CONNECTION_REQUEST_TIMEOUT =
|
||||
"recon.om.connection.request.timeout";
|
||||
public static final String RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT = "5s";
|
||||
|
||||
public static final String RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY =
|
||||
"recon.om.snapshot.task.initial.delay";
|
||||
public static final String
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT = "1m";
|
||||
|
||||
public static final String OZONE_RECON_CONTAINER_DB_STORE_IMPL =
|
||||
"ozone.recon.container.db.impl";
|
||||
public static final String OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT =
|
||||
OZONE_METADATA_STORE_IMPL_ROCKSDB;
|
||||
|
||||
public static final String RECON_OM_SNAPSHOT_TASK_INTERVAL =
|
||||
"recon.om.snapshot.task.interval.delay";
|
||||
public static final String RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT
|
||||
= "10m";
|
||||
|
||||
public static final String RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM =
|
||||
"recon.om.snapshot.task.flush.param";
|
||||
|
||||
/**
|
||||
* Private constructor for utility class.
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_CREATED;
|
||||
import static java.net.HttpURLConnection.HTTP_OK;
|
||||
import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
|
||||
import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
|
||||
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Recon Utility class.
|
||||
*/
|
||||
public final class ReconUtils {
|
||||
|
||||
private final static int WRITE_BUFFER = 1048576; //1MB
|
||||
|
||||
private ReconUtils() {
|
||||
}
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
ReconUtils.class);
|
||||
|
||||
/**
|
||||
* Get configured Recon DB directory value based on config. If not present,
|
||||
* fallback to om.metadata.dirs
|
||||
*
|
||||
* @param conf configuration bag
|
||||
* @param dirConfigKey key to check
|
||||
* @return Return File based on configured or fallback value.
|
||||
*/
|
||||
public static File getReconDbDir(Configuration conf, String dirConfigKey) {
|
||||
|
||||
File metadataDir = getDirectoryFromConfig(conf, dirConfigKey,
|
||||
"Recon");
|
||||
if (metadataDir != null) {
|
||||
return metadataDir;
|
||||
}
|
||||
|
||||
LOG.warn("{} is not configured. We recommend adding this setting. " +
|
||||
"Falling back to {} instead.",
|
||||
dirConfigKey, HddsConfigKeys.OZONE_METADATA_DIRS);
|
||||
return getOzoneMetaDirPath(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Untar DB snapshot tar file to recon OM snapshot directory.
|
||||
*
|
||||
* @param tarFile source tar file
|
||||
* @param destPath destination path to untar to.
|
||||
* @throws IOException ioException
|
||||
*/
|
||||
public static void untarCheckpointFile(File tarFile, Path destPath)
|
||||
throws IOException {
|
||||
|
||||
FileInputStream fileInputStream = null;
|
||||
BufferedInputStream buffIn = null;
|
||||
GzipCompressorInputStream gzIn = null;
|
||||
try {
|
||||
fileInputStream = new FileInputStream(tarFile);
|
||||
buffIn = new BufferedInputStream(fileInputStream);
|
||||
gzIn = new GzipCompressorInputStream(buffIn);
|
||||
|
||||
//Create Destination directory if it does not exist.
|
||||
if (!destPath.toFile().exists()) {
|
||||
boolean success = destPath.toFile().mkdirs();
|
||||
if (!success) {
|
||||
throw new IOException("Unable to create Destination directory.");
|
||||
}
|
||||
}
|
||||
|
||||
try (TarArchiveInputStream tarInStream =
|
||||
new TarArchiveInputStream(gzIn)) {
|
||||
TarArchiveEntry entry = null;
|
||||
|
||||
while ((entry = (TarArchiveEntry) tarInStream.getNextEntry()) != null) {
|
||||
//If directory, create a directory.
|
||||
if (entry.isDirectory()) {
|
||||
File f = new File(Paths.get(destPath.toString(),
|
||||
entry.getName()).toString());
|
||||
boolean success = f.mkdirs();
|
||||
if (!success) {
|
||||
LOG.error("Unable to create directory found in tar.");
|
||||
}
|
||||
} else {
|
||||
//Write contents of file in archive to a new file.
|
||||
int count;
|
||||
byte[] data = new byte[WRITE_BUFFER];
|
||||
|
||||
FileOutputStream fos = new FileOutputStream(
|
||||
Paths.get(destPath.toString(), entry.getName()).toString());
|
||||
try (BufferedOutputStream dest =
|
||||
new BufferedOutputStream(fos, WRITE_BUFFER)) {
|
||||
while ((count =
|
||||
tarInStream.read(data, 0, WRITE_BUFFER)) != -1) {
|
||||
dest.write(data, 0, count);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeStream(gzIn);
|
||||
IOUtils.closeStream(buffIn);
|
||||
IOUtils.closeStream(fileInputStream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make HTTP GET call on the URL and return inputstream to the response.
|
||||
* @param httpClient HttpClient to use.
|
||||
* @param url url to call
|
||||
* @return Inputstream to the response of the HTTP call.
|
||||
* @throws IOException While reading the response.
|
||||
*/
|
||||
public static InputStream makeHttpCall(CloseableHttpClient httpClient,
|
||||
String url)
|
||||
throws IOException {
|
||||
|
||||
HttpGet httpGet = new HttpGet(url);
|
||||
HttpResponse response = httpClient.execute(httpGet);
|
||||
int errorCode = response.getStatusLine().getStatusCode();
|
||||
HttpEntity entity = response.getEntity();
|
||||
|
||||
if ((errorCode == HTTP_OK) || (errorCode == HTTP_CREATED)) {
|
||||
return entity.getContent();
|
||||
}
|
||||
|
||||
if (entity != null) {
|
||||
throw new IOException("Unexpected exception when trying to reach Ozone " +
|
||||
"Manager, " + EntityUtils.toString(entity));
|
||||
} else {
|
||||
throw new IOException("Unexpected null in http payload," +
|
||||
" while processing request");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.recovery;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
|
||||
/**
|
||||
* Interface for the OM Metadata Manager + DB store maintained by
|
||||
* Recon.
|
||||
*/
|
||||
public interface ReconOMMetadataManager extends OMMetadataManager {
|
||||
|
||||
/**
|
||||
* Refresh the DB instance to point to a new location. Get rid of the old
|
||||
* DB instance.
|
||||
* @param dbLocation New location of the OM Snapshot DB.
|
||||
*/
|
||||
void updateOmDB(File dbLocation) throws IOException;
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.recovery;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.inject.Inject;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.utils.db.DBStore;
|
||||
import org.apache.hadoop.utils.db.DBStoreBuilder;
|
||||
import org.apache.hadoop.utils.db.RDBStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Recon's implementation of the OM Metadata manager. By extending and
|
||||
* relying on the OmMetadataManagerImpl, we can make sure all changes made to
|
||||
* schema in OM will be automatically picked up by Recon.
|
||||
*/
|
||||
public class ReconOmMetadataManagerImpl extends OmMetadataManagerImpl
|
||||
implements ReconOMMetadataManager {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ReconOmMetadataManagerImpl.class);
|
||||
|
||||
@Inject
|
||||
private OzoneConfiguration ozoneConfiguration;
|
||||
|
||||
@Inject
|
||||
public ReconOmMetadataManagerImpl(OzoneConfiguration configuration) {
|
||||
this.ozoneConfiguration = configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(OzoneConfiguration configuration) throws IOException {
|
||||
LOG.info("Starting ReconOMMetadataManagerImpl");
|
||||
}
|
||||
|
||||
/**
|
||||
* Replace existing DB instance with new one.
|
||||
*
|
||||
* @param dbFile new DB file location.
|
||||
*/
|
||||
private void initializeNewRdbStore(File dbFile) throws IOException {
|
||||
try {
|
||||
DBStoreBuilder dbStoreBuilder =
|
||||
DBStoreBuilder.newBuilder(ozoneConfiguration)
|
||||
.setReadOnly(true)
|
||||
.setName(dbFile.getName())
|
||||
.setPath(dbFile.toPath().getParent());
|
||||
addOMTablesAndCodecs(dbStoreBuilder);
|
||||
DBStore newStore = dbStoreBuilder.build();
|
||||
setStore(newStore);
|
||||
LOG.info("Created new OM DB snapshot at {}.",
|
||||
dbFile.getAbsolutePath());
|
||||
} catch (IOException ioEx) {
|
||||
LOG.error("Unable to initialize Recon OM DB snapshot store.",
|
||||
ioEx);
|
||||
}
|
||||
if (getStore() != null) {
|
||||
initializeOmTables();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateOmDB(File newDbLocation) throws IOException {
|
||||
if (getStore() != null) {
|
||||
RDBStore rdbStore = (RDBStore) getStore();
|
||||
File oldDBLocation = rdbStore.getDbLocation();
|
||||
if (oldDBLocation.exists()) {
|
||||
LOG.info("Cleaning up old OM snapshot db at {}.",
|
||||
oldDBLocation.getAbsolutePath());
|
||||
FileUtils.deleteQuietly(oldDBLocation);
|
||||
}
|
||||
}
|
||||
initializeNewRdbStore(newDbLocation);
|
||||
}
|
||||
|
||||
}
|
|
@ -18,8 +18,23 @@ package org.apache.hadoop.ozone.recon.spi;
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
|
||||
/**
|
||||
* Interface to access OM endpoints.
|
||||
*/
|
||||
public interface OzoneManagerServiceProvider {
|
||||
|
||||
/**
|
||||
* Start taking OM Snapshots.
|
||||
*/
|
||||
void start() throws IOException;
|
||||
|
||||
/**
|
||||
* Return instance of OM Metadata manager.
|
||||
* @return OM metadata manager instance.
|
||||
*/
|
||||
OMMetadataManager getOMMetadataManagerInstance();
|
||||
}
|
||||
|
|
|
@ -18,20 +18,18 @@
|
|||
|
||||
package org.apache.hadoop.ozone.recon.spi;
|
||||
|
||||
import static org.apache.hadoop.ozone.recon.ReconConstants.
|
||||
RECON_CONTAINER_DB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.
|
||||
OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.
|
||||
OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.
|
||||
OZONE_RECON_DB_DIRS;
|
||||
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_CONTAINER_DB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_STORE_IMPL;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.server.ServerUtils;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.apache.hadoop.utils.MetadataStoreBuilder;
|
||||
|
@ -41,6 +39,7 @@ import org.slf4j.LoggerFactory;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.ProvisionException;
|
||||
|
||||
/**
|
||||
* Provider for the Recon container DB (Metadata store).
|
||||
|
@ -57,21 +56,29 @@ public class ReconContainerDBProvider implements
|
|||
|
||||
@Override
|
||||
public MetadataStore get() {
|
||||
File metaDir = ServerUtils.getDirWithFallBackToOzoneMetadata(configuration,
|
||||
OZONE_RECON_DB_DIRS, "Recon");
|
||||
File containerDBPath = new File(metaDir, RECON_CONTAINER_DB);
|
||||
File metaDir = getReconDbDir(configuration, OZONE_RECON_DB_DIR);
|
||||
File containerDBPath = new File(metaDir,
|
||||
RECON_CONTAINER_DB);
|
||||
int cacheSize = configuration.getInt(OZONE_RECON_CONTAINER_DB_CACHE_SIZE_MB,
|
||||
OZONE_RECON_CONTAINER_DB_CACHE_SIZE_DEFAULT);
|
||||
|
||||
String dbType = configuration.get(OZONE_RECON_CONTAINER_DB_STORE_IMPL,
|
||||
OZONE_RECON_CONTAINER_DB_STORE_IMPL_DEFAULT);
|
||||
MetadataStore metadataStore = null;
|
||||
try {
|
||||
return MetadataStoreBuilder.newBuilder()
|
||||
metadataStore = MetadataStoreBuilder.newBuilder()
|
||||
.setConf(configuration)
|
||||
.setDBType(dbType)
|
||||
.setDbFile(containerDBPath)
|
||||
.setCacheSize(cacheSize * OzoneConsts.MB)
|
||||
.build();
|
||||
} catch (IOException ioEx) {
|
||||
LOG.error("Unable to initialize Recon container metadata store.", ioEx);
|
||||
}
|
||||
return null;
|
||||
if (metadataStore == null) {
|
||||
throw new ProvisionException("Unable to provide instance of Metadata " +
|
||||
"store.");
|
||||
}
|
||||
return metadataStore;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,211 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH;
|
||||
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_OM_SNAPSHOT_DB;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_CONNECTION_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SOCKET_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.recon.ReconUtils.getReconDbDir;
|
||||
import static org.apache.hadoop.ozone.recon.ReconUtils.makeHttpCall;
|
||||
import static org.apache.hadoop.ozone.recon.ReconUtils.untarCheckpointFile;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.inject.Inject;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.om.OMConfigKeys;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.apache.hadoop.utils.db.RocksDBCheckpoint;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Implementation of the OzoneManager Service provider.
|
||||
*/
|
||||
@Singleton
|
||||
public class OzoneManagerServiceProviderImpl
|
||||
implements OzoneManagerServiceProvider {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(OzoneManagerServiceProviderImpl.class);
|
||||
|
||||
private ScheduledExecutorService executorService;
|
||||
private final String dbCheckpointEndPoint = "/dbCheckpoint";
|
||||
private final CloseableHttpClient httpClient;
|
||||
private File omSnapshotDBParentDir = null;
|
||||
private String omDBSnapshotUrl;
|
||||
|
||||
@Inject
|
||||
private OzoneConfiguration configuration;
|
||||
|
||||
@Inject
|
||||
private ReconOMMetadataManager omMetadataManager;
|
||||
|
||||
@Inject
|
||||
public OzoneManagerServiceProviderImpl(OzoneConfiguration configuration) {
|
||||
executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
String ozoneManagerHttpAddress = configuration.get(OMConfigKeys
|
||||
.OZONE_OM_HTTP_ADDRESS_KEY);
|
||||
|
||||
String ozoneManagerHttpsAddress = configuration.get(OMConfigKeys
|
||||
.OZONE_OM_HTTPS_ADDRESS_KEY);
|
||||
|
||||
omSnapshotDBParentDir = getReconDbDir(configuration,
|
||||
OZONE_RECON_OM_SNAPSHOT_DB_DIR);
|
||||
|
||||
boolean ozoneSecurityEnabled = configuration.getBoolean(
|
||||
OZONE_SECURITY_ENABLED_KEY, false);
|
||||
|
||||
int socketTimeout = (int) configuration.getTimeDuration(
|
||||
RECON_OM_SOCKET_TIMEOUT, RECON_OM_SOCKET_TIMEOUT_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
int connectionTimeout = (int) configuration.getTimeDuration(
|
||||
RECON_OM_CONNECTION_TIMEOUT,
|
||||
RECON_OM_CONNECTION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
int connectionRequestTimeout = (int)configuration.getTimeDuration(
|
||||
RECON_OM_CONNECTION_REQUEST_TIMEOUT,
|
||||
RECON_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
|
||||
RequestConfig config = RequestConfig.custom()
|
||||
.setConnectTimeout(socketTimeout)
|
||||
.setConnectionRequestTimeout(connectionTimeout)
|
||||
.setSocketTimeout(connectionRequestTimeout).build();
|
||||
|
||||
httpClient = HttpClientBuilder
|
||||
.create()
|
||||
.setDefaultRequestConfig(config)
|
||||
.build();
|
||||
|
||||
omDBSnapshotUrl = "http://" + ozoneManagerHttpAddress +
|
||||
dbCheckpointEndPoint;
|
||||
|
||||
if (ozoneSecurityEnabled) {
|
||||
omDBSnapshotUrl = "https://" + ozoneManagerHttpsAddress +
|
||||
dbCheckpointEndPoint;
|
||||
}
|
||||
|
||||
boolean flushParam = configuration.getBoolean(
|
||||
RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM, false);
|
||||
|
||||
if (flushParam) {
|
||||
omDBSnapshotUrl += "?" + OZONE_DB_CHECKPOINT_REQUEST_FLUSH + "=true";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws IOException {
|
||||
|
||||
//Schedule a task to periodically obtain the DB snapshot from OM and
|
||||
//update the in house OM metadata managed DB instance.
|
||||
long initialDelay = configuration.getTimeDuration(
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
|
||||
RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
long interval = configuration.getTimeDuration(
|
||||
RECON_OM_SNAPSHOT_TASK_INTERVAL,
|
||||
RECON_OM_SNAPSHOT_TASK_INTERVAL_DEFAULT,
|
||||
TimeUnit.MILLISECONDS);
|
||||
|
||||
LOG.info("Starting thread to get OM DB Snapshot.");
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
DBCheckpoint dbSnapshot = getOzoneManagerDBSnapshot();
|
||||
if (dbSnapshot != null && dbSnapshot.getCheckpointLocation() != null) {
|
||||
try {
|
||||
omMetadataManager.updateOmDB(dbSnapshot.getCheckpointLocation()
|
||||
.toFile());
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to refresh Recon OM DB Snapshot. ", e);
|
||||
}
|
||||
} else {
|
||||
LOG.error("Null snapshot got from OM, {}",
|
||||
dbSnapshot.getCheckpointLocation());
|
||||
}
|
||||
}, initialDelay, interval, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OMMetadataManager getOMMetadataManagerInstance() {
|
||||
return omMetadataManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to obtain current OM DB Snapshot.
|
||||
* @return DBCheckpoint instance.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected DBCheckpoint getOzoneManagerDBSnapshot() {
|
||||
String snapshotFileName = RECON_OM_SNAPSHOT_DB + "_" + System
|
||||
.currentTimeMillis();
|
||||
File targetFile = new File(omSnapshotDBParentDir, snapshotFileName +
|
||||
".tar.gz");
|
||||
try {
|
||||
try (InputStream inputStream = makeHttpCall(httpClient,
|
||||
omDBSnapshotUrl)) {
|
||||
FileUtils.copyInputStreamToFile(inputStream, targetFile);
|
||||
}
|
||||
|
||||
//Untar the checkpoint file.
|
||||
Path untarredDbDir = Paths.get(omSnapshotDBParentDir.getAbsolutePath(),
|
||||
snapshotFileName);
|
||||
untarCheckpointFile(targetFile, untarredDbDir);
|
||||
FileUtils.deleteQuietly(targetFile);
|
||||
|
||||
//TODO Create Checkpoint based on OM DB type.
|
||||
// Currently, OM DB type is not configurable. Hence, defaulting to
|
||||
// RocksDB.
|
||||
return new RocksDBCheckpoint(untarredDbDir);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to obtain Ozone Manager DB Snapshot. ", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,135 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.StatusLine;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
/**
|
||||
* Test Recon Utility methods.
|
||||
*/
|
||||
public class TestReconUtils {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testGetReconDbDir() throws Exception {
|
||||
|
||||
String filePath = folder.getRoot().getAbsolutePath();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set("TEST_DB_DIR", filePath);
|
||||
|
||||
File file = ReconUtils.getReconDbDir(configuration,
|
||||
"TEST_DB_DIR");
|
||||
Assert.assertEquals(file.getAbsolutePath(), filePath);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUntarCheckpointFile() throws Exception {
|
||||
|
||||
File newDir = folder.newFolder();
|
||||
|
||||
File file1 = Paths.get(newDir.getAbsolutePath(), "file1")
|
||||
.toFile();
|
||||
String str = "File1 Contents";
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(
|
||||
file1.getAbsolutePath()));
|
||||
writer.write(str);
|
||||
writer.close();
|
||||
|
||||
File file2 = Paths.get(newDir.getAbsolutePath(), "file2")
|
||||
.toFile();
|
||||
str = "File2 Contents";
|
||||
writer = new BufferedWriter(new FileWriter(file2.getAbsolutePath()));
|
||||
writer.write(str);
|
||||
writer.close();
|
||||
|
||||
//Create test tar file.
|
||||
File tarFile = OmUtils.createTarFile(newDir.toPath());
|
||||
File outputDir = folder.newFolder();
|
||||
ReconUtils.untarCheckpointFile(tarFile, outputDir.toPath());
|
||||
|
||||
assertTrue(outputDir.isDirectory());
|
||||
assertTrue(outputDir.listFiles().length == 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMakeHttpCall() throws Exception {
|
||||
|
||||
CloseableHttpClient httpClientMock = mock(CloseableHttpClient.class);
|
||||
String url = "http://localhost:9874/dbCheckpoint";
|
||||
|
||||
CloseableHttpResponse httpResponseMock = mock(CloseableHttpResponse.class);
|
||||
when(httpClientMock.execute(any(HttpGet.class)))
|
||||
.thenReturn(httpResponseMock);
|
||||
|
||||
StatusLine statusLineMock = mock(StatusLine.class);
|
||||
when(statusLineMock.getStatusCode()).thenReturn(200);
|
||||
when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
|
||||
|
||||
HttpEntity httpEntityMock = mock(HttpEntity.class);
|
||||
when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
|
||||
File file1 = Paths.get(folder.getRoot().getPath(), "file1")
|
||||
.toFile();
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(
|
||||
file1.getAbsolutePath()));
|
||||
writer.write("File 1 Contents");
|
||||
writer.close();
|
||||
InputStream fileInputStream = new FileInputStream(file1);
|
||||
|
||||
when(httpEntityMock.getContent()).thenReturn(new InputStream() {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
return fileInputStream.read();
|
||||
}
|
||||
});
|
||||
|
||||
InputStream inputStream = ReconUtils.makeHttpCall(httpClientMock, url);
|
||||
String contents = IOUtils.toString(inputStream, Charset.defaultCharset());
|
||||
|
||||
assertEquals(contents, "File 1 Contents");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.recovery;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.ozone.om.BucketManager;
|
||||
import org.apache.hadoop.ozone.om.BucketManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
/**
|
||||
* Test Recon OM Metadata Manager implementation.
|
||||
*/
|
||||
public class TestReconOmMetadataManagerImpl {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testUpdateOmDB() throws Exception {
|
||||
|
||||
//Create a new OM Metadata Manager instance + DB.
|
||||
File omDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration omConfiguration = new OzoneConfiguration();
|
||||
omConfiguration.set(OZONE_OM_DB_DIRS,
|
||||
omDbDir.getAbsolutePath());
|
||||
OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(
|
||||
omConfiguration);
|
||||
|
||||
//Create a volume + bucket + 2 keys.
|
||||
String volumeKey = omMetadataManager.getVolumeKey("sampleVol");
|
||||
OmVolumeArgs args =
|
||||
OmVolumeArgs.newBuilder()
|
||||
.setVolume("sampleVol")
|
||||
.setAdminName("TestUser")
|
||||
.setOwnerName("TestUser")
|
||||
.build();
|
||||
omMetadataManager.getVolumeTable().put(volumeKey, args);
|
||||
|
||||
BucketManager bucketManager = new BucketManagerImpl(omMetadataManager);
|
||||
OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
|
||||
.setVolumeName("sampleVol")
|
||||
.setBucketName("bucketOne")
|
||||
.build();
|
||||
bucketManager.createBucket(bucketInfo);
|
||||
|
||||
omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_one",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_one")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_two",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_two")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
|
||||
//Make sure OM Metadata reflects the keys that were inserted.
|
||||
Assert.assertNotNull(omMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_one"));
|
||||
Assert.assertNotNull(omMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_two"));
|
||||
|
||||
//Take checkpoint of OM DB.
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
Assert.assertNotNull(checkpoint.getCheckpointLocation());
|
||||
|
||||
//Create new Recon OM Metadata manager instance.
|
||||
File reconOmDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
|
||||
.getAbsolutePath());
|
||||
ReconOMMetadataManager reconOMMetadataManager =
|
||||
new ReconOmMetadataManagerImpl(configuration);
|
||||
reconOMMetadataManager.start(configuration);
|
||||
|
||||
//Before accepting a snapshot, the metadata should have null tables.
|
||||
Assert.assertNull(reconOMMetadataManager.getBucketTable());
|
||||
|
||||
//Update Recon OM DB with the OM DB checkpoint location.
|
||||
reconOMMetadataManager.updateOmDB(
|
||||
checkpoint.getCheckpointLocation().toFile());
|
||||
|
||||
//Now, the tables should have been initialized.
|
||||
Assert.assertNotNull(reconOMMetadataManager.getBucketTable());
|
||||
|
||||
//Verify Keys inserted in OM DB are available in Recon OM DB.
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_one"));
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_two"));
|
||||
|
||||
//Verify that we cannot write data to Recon OM DB (Read Only)
|
||||
try {
|
||||
reconOMMetadataManager.getKeyTable().put(
|
||||
"/sampleVol/bucketOne/fail_key", new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("fail_key")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
Assert.fail();
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e.getMessage()
|
||||
.contains("Not supported operation in read only mode"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Package for recon server - OM service specific tests.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.recon.recovery;
|
|
@ -36,8 +36,6 @@ import org.junit.Before;
|
|||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
|
@ -46,7 +44,6 @@ import com.google.inject.Injector;
|
|||
/**
|
||||
* Unit Tests for ContainerDBServiceProviderImpl.
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TestContainerDBServiceProviderImpl {
|
||||
|
||||
@Rule
|
||||
|
|
|
@ -0,0 +1,275 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ozone.recon.spi.impl;
|
||||
|
||||
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY;
|
||||
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_INTERVAL;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.ozone.OmUtils;
|
||||
import org.apache.hadoop.ozone.om.BucketManager;
|
||||
import org.apache.hadoop.ozone.om.BucketManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.OMMetadataManager;
|
||||
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
|
||||
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
|
||||
import org.apache.hadoop.ozone.recon.ReconUtils;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
|
||||
import org.apache.hadoop.ozone.recon.recovery.ReconOmMetadataManagerImpl;
|
||||
import org.apache.hadoop.ozone.recon.spi.OzoneManagerServiceProvider;
|
||||
import org.apache.hadoop.utils.db.DBCheckpoint;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.mockito.PowerMockito;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
|
||||
/**
|
||||
* Class to test Ozone Manager Service Provider Implementation.
|
||||
*/
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PowerMockIgnore({"javax.management.*", "javax.net.ssl.*"})
|
||||
@PrepareForTest(ReconUtils.class)
|
||||
public class TestOzoneManagerServiceProviderImpl {
|
||||
|
||||
private OMMetadataManager omMetadataManager;
|
||||
private ReconOMMetadataManager reconOMMetadataManager;
|
||||
private Injector injector;
|
||||
private OzoneManagerServiceProviderImpl ozoneManagerServiceProvider;
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
initializeNewOmMetadataManager();
|
||||
injector = Guice.createInjector(new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
try {
|
||||
initializeNewOmMetadataManager();
|
||||
bind(OzoneConfiguration.class).toInstance(
|
||||
getTestOzoneConfiguration());
|
||||
reconOMMetadataManager = getTestMetadataManager();
|
||||
bind(ReconOMMetadataManager.class).toInstance(reconOMMetadataManager);
|
||||
ozoneManagerServiceProvider = new OzoneManagerServiceProviderImpl(
|
||||
getTestOzoneConfiguration());
|
||||
bind(OzoneManagerServiceProvider.class)
|
||||
.toInstance(ozoneManagerServiceProvider);
|
||||
} catch (IOException e) {
|
||||
Assert.fail();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testStart() throws Exception {
|
||||
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_one"));
|
||||
Assert.assertNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_two"));
|
||||
|
||||
writeDataToOm();
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
File tarFile = OmUtils.createTarFile(checkpoint.getCheckpointLocation());
|
||||
InputStream inputStream = new FileInputStream(tarFile);
|
||||
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
|
||||
"makeHttpCall",
|
||||
CloseableHttpClient.class, String.class))
|
||||
.toReturn(inputStream);
|
||||
|
||||
ozoneManagerServiceProvider.start();
|
||||
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
|
||||
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_one"));
|
||||
Assert.assertNotNull(reconOMMetadataManager.getKeyTable()
|
||||
.get("/sampleVol/bucketOne/key_two"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOMMetadataManagerInstance() throws Exception {
|
||||
OMMetadataManager omMetaMgr = ozoneManagerServiceProvider
|
||||
.getOMMetadataManagerInstance();
|
||||
assertNotNull(omMetaMgr);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetOzoneManagerDBSnapshot() throws Exception {
|
||||
|
||||
File reconOmSnapshotDbDir = temporaryFolder.newFolder();
|
||||
|
||||
File checkpointDir = Paths.get(reconOmSnapshotDbDir.getAbsolutePath(),
|
||||
"testGetOzoneManagerDBSnapshot").toFile();
|
||||
checkpointDir.mkdir();
|
||||
|
||||
File file1 = Paths.get(checkpointDir.getAbsolutePath(), "file1")
|
||||
.toFile();
|
||||
String str = "File1 Contents";
|
||||
BufferedWriter writer = new BufferedWriter(new FileWriter(
|
||||
file1.getAbsolutePath()));
|
||||
writer.write(str);
|
||||
writer.close();
|
||||
|
||||
File file2 = Paths.get(checkpointDir.getAbsolutePath(), "file2")
|
||||
.toFile();
|
||||
str = "File2 Contents";
|
||||
writer = new BufferedWriter(new FileWriter(file2.getAbsolutePath()));
|
||||
writer.write(str);
|
||||
writer.close();
|
||||
|
||||
//Create test tar file.
|
||||
File tarFile = OmUtils.createTarFile(checkpointDir.toPath());
|
||||
|
||||
InputStream fileInputStream = new FileInputStream(tarFile);
|
||||
PowerMockito.stub(PowerMockito.method(ReconUtils.class,
|
||||
"makeHttpCall",
|
||||
CloseableHttpClient.class, String.class))
|
||||
.toReturn(fileInputStream);
|
||||
|
||||
DBCheckpoint checkpoint = ozoneManagerServiceProvider
|
||||
.getOzoneManagerDBSnapshot();
|
||||
assertNotNull(checkpoint);
|
||||
assertTrue(checkpoint.getCheckpointLocation().toFile().isDirectory());
|
||||
assertTrue(checkpoint.getCheckpointLocation().toFile()
|
||||
.listFiles().length == 2);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Test OzoneConfiguration instance.
|
||||
* @return OzoneConfiguration
|
||||
* @throws IOException ioEx.
|
||||
*/
|
||||
private OzoneConfiguration getTestOzoneConfiguration() throws IOException {
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(RECON_OM_SNAPSHOT_TASK_INITIAL_DELAY,
|
||||
"0m");
|
||||
configuration.set(RECON_OM_SNAPSHOT_TASK_INTERVAL, "1m");
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
|
||||
temporaryFolder.newFolder().getAbsolutePath());
|
||||
return configuration;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new OM Metadata manager instance.
|
||||
* @throws IOException ioEx
|
||||
*/
|
||||
private void initializeNewOmMetadataManager() throws IOException {
|
||||
File omDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration omConfiguration = new OzoneConfiguration();
|
||||
omConfiguration.set(OZONE_OM_DB_DIRS,
|
||||
omDbDir.getAbsolutePath());
|
||||
omMetadataManager = new OmMetadataManagerImpl(omConfiguration);
|
||||
|
||||
String volumeKey = omMetadataManager.getVolumeKey("sampleVol");
|
||||
OmVolumeArgs args =
|
||||
OmVolumeArgs.newBuilder()
|
||||
.setVolume("sampleVol")
|
||||
.setAdminName("TestUser")
|
||||
.setOwnerName("TestUser")
|
||||
.build();
|
||||
omMetadataManager.getVolumeTable().put(volumeKey, args);
|
||||
|
||||
BucketManager bucketManager = new BucketManagerImpl(omMetadataManager);
|
||||
OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
|
||||
.setVolumeName("sampleVol")
|
||||
.setBucketName("bucketOne")
|
||||
.build();
|
||||
bucketManager.createBucket(bucketInfo);
|
||||
|
||||
omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_one",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_one")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get an instance of Recon OM Metadata manager.
|
||||
* @return ReconOMMetadataManager
|
||||
* @throws IOException when creating the RocksDB instance.
|
||||
*/
|
||||
private ReconOMMetadataManager getTestMetadataManager() throws IOException {
|
||||
|
||||
DBCheckpoint checkpoint = omMetadataManager.getStore()
|
||||
.getCheckpoint(true);
|
||||
assertNotNull(checkpoint.getCheckpointLocation());
|
||||
|
||||
File reconOmDbDir = temporaryFolder.newFolder();
|
||||
OzoneConfiguration configuration = new OzoneConfiguration();
|
||||
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR, reconOmDbDir
|
||||
.getAbsolutePath());
|
||||
|
||||
ReconOMMetadataManager reconOMMetaMgr =
|
||||
new ReconOmMetadataManagerImpl(configuration);
|
||||
reconOMMetaMgr.start(configuration);
|
||||
|
||||
reconOMMetaMgr.updateOmDB(
|
||||
checkpoint.getCheckpointLocation().toFile());
|
||||
return reconOMMetaMgr;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a key to OM instance.
|
||||
* @throws IOException while writing.
|
||||
*/
|
||||
private void writeDataToOm() throws IOException {
|
||||
omMetadataManager.getKeyTable().put("/sampleVol/bucketOne/key_two",
|
||||
new OmKeyInfo.Builder()
|
||||
.setBucketName("bucketOne")
|
||||
.setVolumeName("sampleVol")
|
||||
.setKeyName("key_two")
|
||||
.setReplicationFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setReplicationType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.build());
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue