diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java index 59004c6e6e4..3c8df38deca 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBStore.java @@ -184,4 +184,13 @@ void move(KEY sourceKey, KEY destKey, VALUE value, * @return codec registry. */ CodecRegistry getCodecRegistry(); + + /** + * Get data written to DB since a specific sequence number. + * @param sequenceNumber + * @return + * @throws SequenceNumberNotFoundException + */ + DBUpdatesWrapper getUpdatesSince(long sequenceNumber) + throws SequenceNumberNotFoundException; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java new file mode 100644 index 00000000000..54ebc7ca6c0 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/DBUpdatesWrapper.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.utils.db; + +import java.util.ArrayList; +import java.util.List; + +/** + * Wrapper class to hold DB data read from the RocksDB log file. + */ +public class DBUpdatesWrapper { + + private List dataList = new ArrayList<>(); + private long currentSequenceNumber = -1; + + public void addWriteBatch(byte[] data, long sequenceNumber) { + dataList.add(data); + if (currentSequenceNumber < sequenceNumber) { + currentSequenceNumber = sequenceNumber; + } + } + + public List getData() { + return dataList; + } + + public long getCurrentSequenceNumber() { + return currentSequenceNumber; + } +} + diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 23c03f1884b..4182687ceb7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -46,6 +46,7 @@ import org.rocksdb.FlushOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.rocksdb.TransactionLogIterator; import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -327,6 +328,51 @@ public CodecRegistry getCodecRegistry() { return codecRegistry; } + @Override + public DBUpdatesWrapper getUpdatesSince(long sequenceNumber) + throws SequenceNumberNotFoundException { + + DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper(); + try { + TransactionLogIterator transactionLogIterator = + db.getUpdatesSince(sequenceNumber); + + // Only the first record needs to be checked if its seq number < + // ( 1 + passed_in_sequence_number). For example, if seqNumber passed + // in is 100, then we can read from the WAL ONLY if the first sequence + // number is <= 101. If it is 102, then 101 may already be flushed to + // SST. If it 99, we can skip 99 and 100, and then read from 101. + + boolean checkValidStartingSeqNumber = true; + + while (transactionLogIterator.isValid()) { + TransactionLogIterator.BatchResult result = + transactionLogIterator.getBatch(); + long currSequenceNumber = result.sequenceNumber(); + if (checkValidStartingSeqNumber && + currSequenceNumber > 1 + sequenceNumber) { + throw new SequenceNumberNotFoundException("Unable to read data from" + + " RocksDB wal to get delta updates. It may have already been" + + "flushed to SSTs."); + } + // If the above condition was not satisfied, then it is OK to reset + // the flag. + checkValidStartingSeqNumber = false; + if (currSequenceNumber <= sequenceNumber) { + transactionLogIterator.next(); + continue; + } + dbUpdatesWrapper.addWriteBatch(result.writeBatch().data(), + result.sequenceNumber()); + transactionLogIterator.next(); + } + } catch (RocksDBException e) { + LOG.error("Unable to get delta updates since sequenceNumber {} ", + sequenceNumber, e); + } + return dbUpdatesWrapper; + } + @VisibleForTesting public RocksDB getDb() { return db; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/SequenceNumberNotFoundException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/SequenceNumberNotFoundException.java new file mode 100644 index 00000000000..13c040b5c2e --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/SequenceNumberNotFoundException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.utils.db; + +import java.io.IOException; + +/** + * Thrown if RocksDB is unable to find requested data from WAL file. + */ +public class SequenceNumberNotFoundException extends IOException { + + public SequenceNumberNotFoundException() { + super(); + } + + public SequenceNumberNotFoundException(String message) { + super(message); + } + +} diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java index 286221a2d48..0ac0dc82745 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/utils/db/TestRDBStore.java @@ -325,4 +325,25 @@ public void testRocksDBKeyMayExistApi() throws Exception { } } + @Test + public void testGetDBUpdatesSince() throws Exception { + + try (RDBStore newStore = + new RDBStore(folder.newFolder(), options, configSet)) { + + try (Table firstTable = newStore.getTable(families.get(1))) { + firstTable.put(StringUtils.getBytesUtf16("Key1"), StringUtils + .getBytesUtf16("Value1")); + firstTable.put(StringUtils.getBytesUtf16("Key2"), StringUtils + .getBytesUtf16("Value2")); + } + Assert.assertTrue( + newStore.getDb().getLatestSequenceNumber() == 2); + + DBUpdatesWrapper dbUpdatesSince = newStore.getUpdatesSince(0); + Assert.assertEquals(2, dbUpdatesSince.getData().size()); + } + } + + } \ No newline at end of file diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java index 25bfc29e9dc..bcc7ac5995b 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java @@ -199,6 +199,7 @@ public static boolean isReadOnly( case LookupFile: case ListStatus: case GetAcl: + case DBUpdates: return true; case CreateVolume: case SetVolumeProperty: diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto index 5dd2b5506d4..68e9f267abe 100644 --- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto +++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto @@ -73,6 +73,7 @@ enum Type { ListMultiPartUploadParts = 50; ServiceList = 51; + DBUpdates = 53; GetDelegationToken = 61; RenewDelegationToken = 62; @@ -136,6 +137,7 @@ message OMRequest { optional MultipartUploadListPartsRequest listMultipartUploadPartsRequest = 50; optional ServiceListRequest serviceListRequest = 51; + optional DBUpdatesRequest dbUpdatesRequest = 53; optional hadoop.common.GetDelegationTokenRequestProto getDelegationTokenRequest = 61; optional hadoop.common.RenewDelegationTokenRequestProto renewDelegationTokenRequest= 62; @@ -202,6 +204,7 @@ message OMResponse { optional MultipartUploadListPartsResponse listMultipartUploadPartsResponse = 50; optional ServiceListResponse ServiceListResponse = 51; + optional DBUpdatesResponse dbUpdatesResponse = 52; optional GetDelegationTokenResponseProto getDelegationTokenResponse = 61; optional RenewDelegationTokenResponseProto renewDelegationTokenResponse = 62; @@ -836,11 +839,20 @@ message AllocateBlockResponse { message ServiceListRequest { } +message DBUpdatesRequest { + required uint64 sequenceNumber = 1; +} + message ServiceListResponse { repeated ServiceInfo serviceInfo = 2; } +message DBUpdatesResponse { + required uint64 sequenceNumber = 1; + repeated bytes data = 2; +} + message ServicePort { enum Type { RPC = 1; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java index 9845b62c420..3b99c6f9efc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; import org.apache.hadoop.ozone.om.helpers.OpenKeySession; import org.apache.hadoop.ozone.om.helpers.ServiceInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType; @@ -81,6 +82,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.db.DBUpdatesWrapper; import org.apache.hadoop.utils.db.RDBStore; import org.apache.hadoop.utils.db.Table; import org.apache.hadoop.utils.db.Table.KeyValue; @@ -1395,8 +1397,41 @@ public void testDBKeyMayExist() throws Exception { RDBStore rdbStore = (RDBStore) cluster.getOzoneManager() .getMetadataManager().getStore(); RocksDB db = rdbStore.getDb(); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + OmKeyInfo keyInfo = getNewOmKeyInfo(); + OmKeyInfoCodec omKeyInfoCodec = new OmKeyInfoCodec(); + + db.put(StringUtils.getBytesUtf16("OMKey1"), + omKeyInfoCodec.toPersistedFormat(keyInfo)); + + StringBuilder sb = new StringBuilder(); + Assert.assertTrue(db.keyMayExist(StringUtils.getBytesUtf16("OMKey1"), + sb)); + Assert.assertTrue(sb.length() > 0); + } + + + @Test + public void testGetOMDBUpdates() throws IOException { + + DBUpdatesRequest dbUpdatesRequest = + DBUpdatesRequest.newBuilder().setSequenceNumber(0).build(); + + DBUpdatesWrapper dbUpdates = + cluster.getOzoneManager().getDBUpdates(dbUpdatesRequest); + Assert.assertTrue(dbUpdates.getData().isEmpty()); + + //Write data to OM. + OmKeyInfo keyInfo = getNewOmKeyInfo(); + Assert.assertNotNull(keyInfo); + dbUpdates = + cluster.getOzoneManager().getDBUpdates(dbUpdatesRequest); + Assert.assertFalse(dbUpdates.getData().isEmpty()); + + } + + private OmKeyInfo getNewOmKeyInfo() throws IOException { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder() .setVolume("vol1") .setAdminName("bilbo") @@ -1423,16 +1458,6 @@ public void testDBKeyMayExist() throws Exception { .build(); OpenKeySession keySession = cluster.getOzoneManager().getKeyManager() .openKey(keyArgs); - OmKeyInfo keyInfo = keySession.getKeyInfo(); - OmKeyInfoCodec omKeyInfoCodec = new OmKeyInfoCodec(); - - db.put(StringUtils.getBytesUtf16("OMKey1"), - omKeyInfoCodec.toPersistedFormat(keyInfo)); - - StringBuilder sb = new StringBuilder(); - Assert.assertTrue(db.keyMayExist(StringUtils.getBytesUtf16("OMKey1"), - sb)); - Assert.assertTrue(sb.length() > 0); + return keySession.getKeyInfo(); } - } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index e2f1ceeaabe..84e1781645c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -81,6 +81,7 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue; import org.apache.hadoop.ozone.om.protocol.OzoneManagerServerProtocol; import org.apache.hadoop.ozone.om.snapshot.OzoneManagerSnapshotProvider; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos .KeyArgs; import org.apache.hadoop.ozone.security.OzoneSecurityException; @@ -144,6 +145,8 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.utils.RetriableTask; +import org.apache.hadoop.utils.db.DBUpdatesWrapper; +import org.apache.hadoop.utils.db.SequenceNumberNotFoundException; import org.apache.hadoop.utils.db.DBCheckpoint; import org.apache.hadoop.utils.db.DBStore; import org.apache.ratis.server.protocol.TermIndex; @@ -3346,4 +3349,18 @@ public boolean isLeader() { public boolean isRatisEnabled() { return isRatisEnabled; } + + /** + * Get DB updates since a specific sequence number. + * @param dbUpdatesRequest request that encapsulates a sequence number. + * @return Wrapper containing the updates. + * @throws SequenceNumberNotFoundException if db is unable to read the data. + */ + public DBUpdatesWrapper getDBUpdates( + DBUpdatesRequest dbUpdatesRequest) + throws SequenceNumberNotFoundException { + return metadataManager.getStore() + .getUpdatesSince(dbUpdatesRequest.getSequenceNumber()); + + } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index cd0c4834faa..53ab6fc703a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -122,6 +122,9 @@ import org.apache.hadoop.security.token.Token; import com.google.common.collect.Lists; + +import org.apache.hadoop.utils.db.DBUpdatesWrapper; +import org.apache.hadoop.utils.db.SequenceNumberNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -298,6 +301,11 @@ public OMResponse handle(OMRequest request) { request.getServiceListRequest()); responseBuilder.setServiceListResponse(serviceListResponse); break; + case DBUpdates: + DBUpdatesResponse dbUpdatesResponse = getOMDBUpdates( + request.getDbUpdatesRequest()); + responseBuilder.setDbUpdatesResponse(dbUpdatesResponse); + break; case GetDelegationToken: GetDelegationTokenResponseProto getDtResp = getDelegationToken( request.getGetDelegationTokenRequest()); @@ -377,6 +385,21 @@ public OMResponse handle(OMRequest request) { return responseBuilder.build(); } + private DBUpdatesResponse getOMDBUpdates( + DBUpdatesRequest dbUpdatesRequest) + throws SequenceNumberNotFoundException { + + DBUpdatesResponse.Builder builder = DBUpdatesResponse + .newBuilder(); + DBUpdatesWrapper dbUpdatesWrapper = + impl.getDBUpdates(dbUpdatesRequest); + for (int i = 0; i < dbUpdatesWrapper.getData().size(); i++) { + builder.setData(i, + OMPBHelper.getByteString(dbUpdatesWrapper.getData().get(i))); + } + return builder.build(); + } + private GetAclResponse getAcl(GetAclRequest req) throws IOException { List acls = new ArrayList<>();