HDDS-1291. Set OmKeyArgs#refreshPipeline flag properly to avoid reading from stale pipeline. Contributed by Xiaoyu Yao. (#639)
This commit is contained in:
parent
f854a89190
commit
dea6f2a065
|
@ -664,6 +664,7 @@ public class RpcClient implements ClientProtocol {
|
|||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
|
||||
LengthInputStream lengthInputStream =
|
||||
|
@ -739,6 +740,7 @@ public class RpcClient implements ClientProtocol {
|
|||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
|
||||
|
||||
|
|
|
@ -107,6 +107,7 @@ public class TestStorageContainerManagerHelper {
|
|||
.setVolumeName(volume)
|
||||
.setBucketName(bucket)
|
||||
.setKeyName(key)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo location = cluster.getOzoneManager()
|
||||
.lookupKey(arg);
|
||||
|
|
|
@ -117,6 +117,7 @@ public class TestBCSID {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
|
||||
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName("ratis")
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
List<OmKeyLocationInfo> keyLocationInfos =
|
||||
|
|
|
@ -132,6 +132,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
waitForContainerClose(key);
|
||||
|
@ -165,6 +166,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
.setBucketName(bucketName)
|
||||
.setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
waitForContainerClose(key);
|
||||
|
@ -198,6 +200,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
waitForContainerClose(key);
|
||||
|
@ -257,6 +260,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
waitForContainerClose(key);
|
||||
|
@ -300,6 +304,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
waitForContainerClose(key);
|
||||
|
@ -418,6 +423,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
|
||||
setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
|
||||
|
@ -451,6 +457,7 @@ public class TestCloseContainerHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
waitForContainerClose(key);
|
||||
|
|
|
@ -150,6 +150,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
Assert.assertEquals(data.length, keyInfo.getDataSize());
|
||||
|
@ -192,6 +193,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
Assert.assertEquals(2 * data.getBytes().length, keyInfo.getDataSize());
|
||||
|
@ -238,6 +240,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
Assert.assertEquals(3 * data.getBytes().length, keyInfo.getDataSize());
|
||||
|
@ -278,6 +281,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
|
||||
|
@ -338,6 +342,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
|
||||
|
@ -398,6 +403,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
|
||||
|
@ -460,6 +466,7 @@ public class TestFailureHandlingByClient {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
|
||||
|
||||
|
|
|
@ -203,6 +203,7 @@ public class TestOzoneAtRestEncryption extends TestOzoneRpcClient {
|
|||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
HddsProtos.ReplicationType replicationType =
|
||||
HddsProtos.ReplicationType.valueOf(type.toString());
|
||||
|
|
|
@ -547,6 +547,7 @@ public abstract class TestOzoneRpcClientAbstract {
|
|||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
HddsProtos.ReplicationType replicationType =
|
||||
HddsProtos.ReplicationType.valueOf(type.toString());
|
||||
|
@ -619,7 +620,7 @@ public abstract class TestOzoneRpcClientAbstract {
|
|||
out.close();
|
||||
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
|
||||
builder.setVolumeName(volumeName).setBucketName(bucketName)
|
||||
.setKeyName(keyName);
|
||||
.setKeyName(keyName).setRefreshPipeline(true);
|
||||
OmKeyInfo keyInfo = ozoneManager.lookupKey(builder.build());
|
||||
|
||||
List<OmKeyLocationInfo> locationInfoList =
|
||||
|
@ -891,7 +892,7 @@ public abstract class TestOzoneRpcClientAbstract {
|
|||
// First, confirm the key info from the client matches the info in OM.
|
||||
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
|
||||
builder.setVolumeName(volumeName).setBucketName(bucketName)
|
||||
.setKeyName(keyName);
|
||||
.setKeyName(keyName).setRefreshPipeline(true);
|
||||
OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()).
|
||||
getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0);
|
||||
long containerID = keyInfo.getContainerID();
|
||||
|
|
|
@ -144,7 +144,7 @@ public class TestReadRetries {
|
|||
// First, confirm the key info from the client matches the info in OM.
|
||||
OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
|
||||
builder.setVolumeName(volumeName).setBucketName(bucketName)
|
||||
.setKeyName(keyName);
|
||||
.setKeyName(keyName).setRefreshPipeline(true);
|
||||
OmKeyLocationInfo keyInfo = ozoneManager.lookupKey(builder.build()).
|
||||
getKeyLocationVersions().get(0).getBlocksLatestVersionOnly().get(0);
|
||||
long containerID = keyInfo.getContainerID();
|
||||
|
|
|
@ -218,6 +218,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
|
|||
.setVolumeName(volumeName)
|
||||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
HddsProtos.ReplicationType replicationType =
|
||||
HddsProtos.ReplicationType.valueOf(type.toString());
|
||||
|
|
|
@ -143,7 +143,9 @@ public class TestBlockDeletion {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
|
||||
.setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
|
||||
.setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).build();
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
|
||||
om.lookupKey(keyArgs).getKeyLocationVersions();
|
||||
|
||||
|
|
|
@ -103,7 +103,7 @@ public class TestCloseContainerByPipeline {
|
|||
new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
|
||||
.setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
|
||||
.setKeyName("standalone").build();
|
||||
.setKeyName("standalone").setRefreshPipeline(true).build();
|
||||
OmKeyLocationInfo omKeyLocationInfo =
|
||||
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
|
||||
.get(0).getBlocksLatestVersionOnly().get(0);
|
||||
|
@ -157,7 +157,9 @@ public class TestCloseContainerByPipeline {
|
|||
new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
|
||||
.setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
|
||||
.setKeyName("standalone").build();
|
||||
.setKeyName("standalone")
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
OmKeyLocationInfo omKeyLocationInfo =
|
||||
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
|
||||
|
@ -205,7 +207,7 @@ public class TestCloseContainerByPipeline {
|
|||
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName("test").
|
||||
setBucketName("test").setType(HddsProtos.ReplicationType.RATIS)
|
||||
.setFactor(HddsProtos.ReplicationFactor.THREE).setDataSize(1024)
|
||||
.setKeyName("ratis").build();
|
||||
.setKeyName("ratis").setRefreshPipeline(true).build();
|
||||
|
||||
OmKeyLocationInfo omKeyLocationInfo =
|
||||
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
|
||||
|
|
|
@ -76,7 +76,9 @@ public class TestCloseContainerHandler {
|
|||
new OmKeyArgs.Builder().setVolumeName("test").setBucketName("test")
|
||||
.setType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(1024)
|
||||
.setKeyName("test").build();
|
||||
.setKeyName("test")
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
OmKeyLocationInfo omKeyLocationInfo =
|
||||
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
|
||||
|
|
|
@ -234,7 +234,9 @@ public class TestDeleteContainerHandler {
|
|||
.setBucketName(bucketName)
|
||||
.setType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE)
|
||||
.setKeyName(keyName).build();
|
||||
.setKeyName(keyName)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
OmKeyLocationInfo omKeyLocationInfo =
|
||||
cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions()
|
||||
|
|
|
@ -105,6 +105,7 @@ public class TestContainerReportWithKeys {
|
|||
.setKeyName(keyName)
|
||||
.setType(HddsProtos.ReplicationType.STAND_ALONE)
|
||||
.setFactor(HddsProtos.ReplicationFactor.ONE).setDataSize(keySize)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
|
||||
|
|
|
@ -115,6 +115,7 @@ public class TestOmBlockVersioning {
|
|||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(1000)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
// 1st update, version 0
|
||||
|
@ -214,6 +215,7 @@ public class TestOmBlockVersioning {
|
|||
.setBucketName(bucketName)
|
||||
.setKeyName(keyName)
|
||||
.setDataSize(1000)
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
String dataString = RandomStringUtils.randomAlphabetic(100);
|
||||
|
|
|
@ -479,6 +479,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
.setBucketName(args.getBucketName())
|
||||
.setKeyName(args.getKeyName())
|
||||
.setDataSize(args.getSize())
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = ozoneManagerClient.lookupKey(keyArgs);
|
||||
return KeyInputStream.getFromOmKeyInfo(
|
||||
|
@ -513,6 +514,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
.setVolumeName(args.getVolumeName())
|
||||
.setBucketName(args.getBucketName())
|
||||
.setKeyName(args.getKeyName())
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
|
||||
OmKeyInfo omKeyInfo = ozoneManagerClient.lookupKey(keyArgs);
|
||||
|
@ -534,6 +536,7 @@ public final class DistributedStorageHandler implements StorageHandler {
|
|||
.setVolumeName(args.getVolumeName())
|
||||
.setBucketName(args.getBucketName())
|
||||
.setKeyName(args.getKeyName())
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo omKeyInfo = ozoneManagerClient.lookupKey(keyArgs);
|
||||
List<KeyLocation> keyLocations = new ArrayList<>();
|
||||
|
|
|
@ -500,6 +500,7 @@ public class OzoneManagerRequestHandler implements RequestHandler {
|
|||
.setVolumeName(keyArgs.getVolumeName())
|
||||
.setBucketName(keyArgs.getBucketName())
|
||||
.setKeyName(keyArgs.getKeyName())
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs);
|
||||
resp.setKeyInfo(keyInfo.getProtobuf());
|
||||
|
@ -516,6 +517,7 @@ public class OzoneManagerRequestHandler implements RequestHandler {
|
|||
.setVolumeName(keyArgs.getVolumeName())
|
||||
.setBucketName(keyArgs.getBucketName())
|
||||
.setKeyName(keyArgs.getKeyName())
|
||||
.setRefreshPipeline(true)
|
||||
.build();
|
||||
impl.renameKey(omKeyArgs, request.getToKeyName());
|
||||
|
||||
|
|
Loading…
Reference in New Issue