diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java new file mode 100644 index 00000000000..a68b94a9813 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.utils.db.BatchOperation; + +import org.apache.ratis.util.ExitUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements DoubleBuffer implementation of OMClientResponse's. In + * DoubleBuffer it has 2 buffers one is currentBuffer and other is + * readyBuffer. The current OM requests will be always added to currentBuffer. + * Flush thread will be running in background, it check's if currentBuffer has + * any entries, it swaps the buffer and creates a batch and commit to DB. + * Adding OM request to doubleBuffer and swap of buffer are synchronized + * methods. + * + */ +public class OzoneManagerDoubleBuffer { + + private static final Logger LOG = + LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class.getName()); + + // Taken unbounded queue, if sync thread is taking too long time, we + // might end up taking huge memory to add entries to the buffer. + // TODO: We can avoid this using unbounded queue and use queue with + // capacity, if queue is full we can wait for sync to be completed to + // add entries. But in this also we might block rpc handlers, as we + // clear entries after sync. Or we can come up with a good approach to + // solve this. + private Queue> currentBuffer; + private Queue> readyBuffer; + + private Daemon daemon; + private final OMMetadataManager omMetadataManager; + private final AtomicLong flushedTransactionCount = new AtomicLong(0); + private final AtomicLong flushIterations = new AtomicLong(0); + private volatile boolean isRunning; + + + public OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager) { + this.currentBuffer = new ConcurrentLinkedQueue<>(); + this.readyBuffer = new ConcurrentLinkedQueue<>(); + this.omMetadataManager = omMetadataManager; + + isRunning = true; + // Daemon thread which runs in back ground and flushes transactions to DB. + daemon = new Daemon(this::flushTransactions); + daemon.setName("OMDoubleBufferFlushThread"); + daemon.start(); + + } + + /** + * Runs in a background thread and batches the transaction in currentBuffer + * and commit to DB. + */ + private void flushTransactions() { + while(isRunning) { + try { + if (canFlush()) { + setReadyBuffer(); + final BatchOperation batchOperation = omMetadataManager.getStore() + .initBatchOperation(); + + readyBuffer.iterator().forEachRemaining((entry) -> { + try { + entry.getResponse().addToDBBatch(omMetadataManager, + batchOperation); + } catch (IOException ex) { + // During Adding to RocksDB batch entry got an exception. + // We should terminate the OM. + terminate(ex); + } + }); + + omMetadataManager.getStore().commitBatchOperation(batchOperation); + int flushedTransactionsSize = readyBuffer.size(); + flushedTransactionCount.addAndGet(flushedTransactionsSize); + flushIterations.incrementAndGet(); + + LOG.debug("Sync Iteration {} flushed transactions in this " + + "iteration{}", flushIterations.get(), + flushedTransactionsSize); + readyBuffer.clear(); + // TODO: update the last updated index in OzoneManagerStateMachine. + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + if (isRunning) { + final String message = "OMDoubleBuffer flush thread " + + Thread.currentThread().getName() + " encountered Interrupted " + + "exception while running"; + ExitUtils.terminate(1, message, ex, LOG); + } else { + LOG.info("OMDoubleBuffer flush thread " + + Thread.currentThread().getName() + " is interrupted and will " + + "exit. {}", Thread.currentThread().getName()); + } + } catch (IOException ex) { + terminate(ex); + } catch (Throwable t) { + final String s = "OMDoubleBuffer flush thread" + + Thread.currentThread().getName() + "encountered Throwable error"; + ExitUtils.terminate(2, s, t, LOG); + } + } + } + + /** + * Stop OM DoubleBuffer flush thread. + */ + public synchronized void stop() { + if (isRunning) { + LOG.info("Stopping OMDoubleBuffer flush thread"); + isRunning = false; + daemon.interrupt(); + } else { + LOG.info("OMDoubleBuffer flush thread is not running."); + } + + } + + private void terminate(IOException ex) { + String message = "During flush to DB encountered error in " + + "OMDoubleBuffer flush thread " + Thread.currentThread().getName(); + ExitUtils.terminate(1, message, ex, LOG); + } + + /** + * Returns the flushed transaction count to OM DB. + * @return flushedTransactionCount + */ + public long getFlushedTransactionCount() { + return flushedTransactionCount.get(); + } + + /** + * Returns total number of flush iterations run by sync thread. + * @return flushIterations + */ + public long getFlushIterations() { + return flushIterations.get(); + } + + /** + * Add OmResponseBufferEntry to buffer. + * @param response + * @param transactionIndex + */ + public synchronized void add(OMClientResponse response, + long transactionIndex) { + currentBuffer.add(new DoubleBufferEntry<>(transactionIndex, response)); + notify(); + } + + /** + * Check can we flush transactions or not. This method wait's until + * currentBuffer size is greater than zero, once currentBuffer size is + * greater than zero it gets notify signal, and it returns true + * indicating that we are ready to flush. + * + * @return boolean + */ + private synchronized boolean canFlush() throws InterruptedException { + // When transactions are added to buffer it notifies, then we check if + // currentBuffer size once and return from this method. + while (currentBuffer.size() == 0) { + wait(Long.MAX_VALUE); + } + return true; + } + + /** + * Prepares the readyBuffer which is used by sync thread to flush + * transactions to OM DB. This method swaps the currentBuffer and readyBuffer. + */ + private synchronized void setReadyBuffer() { + Queue> temp = currentBuffer; + currentBuffer = readyBuffer; + readyBuffer = temp; + } + +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java new file mode 100644 index 00000000000..cd4c5ae8b25 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/DoubleBufferEntry.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis.helpers; + +import org.apache.hadoop.ozone.om.response.OMClientResponse; + +/** + * Entry in OzoneManagerDouble Buffer. + * @param + */ +public class DoubleBufferEntry { + + private long trxLogIndex; + private Response response; + + public DoubleBufferEntry(long trxLogIndex, Response response) { + this.trxLogIndex = trxLogIndex; + this.response = response; + } + + public long getTrxLogIndex() { + return trxLogIndex; + } + + public Response getResponse() { + return response; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java new file mode 100644 index 00000000000..b12a324d681 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/helpers/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +/** + * package which contains helper classes for each OM request response. + */ +package org.apache.hadoop.ozone.om.ratis.helpers; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java new file mode 100644 index 00000000000..7e222edb8ff --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketCreateResponse.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for CreateBucket request. + */ +public final class OMBucketCreateResponse implements OMClientResponse { + + private final OmBucketInfo omBucketInfo; + + public OMBucketCreateResponse(OmBucketInfo omBucketInfo) { + this.omBucketInfo = omBucketInfo; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + omBucketInfo.getBucketName()); + omMetadataManager.getBucketTable().putWithBatch(batchOperation, dbBucketKey, + omBucketInfo); + } + + public OmBucketInfo getOmBucketInfo() { + return omBucketInfo; + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java new file mode 100644 index 00000000000..fd3842db7e6 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMBucketDeleteResponse.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for DeleteBucket request. + */ +public final class OMBucketDeleteResponse implements OMClientResponse { + + private String volumeName; + private String bucketName; + + public OMBucketDeleteResponse( + String volumeName, String bucketName) { + this.volumeName = volumeName; + this.bucketName = bucketName; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(volumeName, bucketName); + omMetadataManager.getBucketTable().deleteWithBatch(batchOperation, + dbBucketKey); + } + + public String getVolumeName() { + return volumeName; + } + + public String getBucketName() { + return bucketName; + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java new file mode 100644 index 00000000000..2603421a59a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.utils.db.BatchOperation; + +import java.io.IOException; + +/** + * Interface for OM Responses, each OM response should implement this interface. + */ +public interface OMClientResponse { + + /** + * Implement logic to add the response to batch. + * @param omMetadataManager + * @param batchOperation + * @throws IOException + */ + default void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + throw new NotImplementedException("Not implemented, Each OM Response " + + "should implement this method"); + } + +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java new file mode 100644 index 00000000000..857f03aa01a --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeCreateResponse.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for CreateBucket request. + */ +public class OMVolumeCreateResponse implements OMClientResponse { + + private OzoneManagerProtocolProtos.VolumeList volumeList; + private OmVolumeArgs omVolumeArgs; + + public OMVolumeCreateResponse(OmVolumeArgs omVolumeArgs, + OzoneManagerProtocolProtos.VolumeList volumeList) { + this.omVolumeArgs = omVolumeArgs; + this.volumeList = volumeList; + } + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + + String dbVolumeKey = + omMetadataManager.getVolumeKey(omVolumeArgs.getVolume()); + String dbUserKey = + omMetadataManager.getUserKey(omVolumeArgs.getOwnerName()); + + omMetadataManager.getVolumeTable().putWithBatch(batchOperation, dbVolumeKey, + omVolumeArgs); + omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey, + volumeList); + } + + public OzoneManagerProtocolProtos.VolumeList getVolumeList() { + return volumeList; + } + + public OmVolumeArgs getOmVolumeArgs() { + return omVolumeArgs; + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java new file mode 100644 index 00000000000..02663cb8887 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMVolumeDeleteResponse.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + +import java.io.IOException; + +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.utils.db.BatchOperation; + +/** + * Response for CreateVolume request. + */ +public class OMVolumeDeleteResponse implements OMClientResponse { + private String volume; + private String owner; + private OzoneManagerProtocolProtos.VolumeList updatedVolumeList; + + public OMVolumeDeleteResponse(String volume, String owner, + OzoneManagerProtocolProtos.VolumeList updatedVolumeList) { + this.volume = volume; + this.owner = owner; + this.updatedVolumeList = updatedVolumeList; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbUserKey = omMetadataManager.getUserKey(owner); + OzoneManagerProtocolProtos.VolumeList volumeList = + updatedVolumeList; + if (updatedVolumeList.getVolumeNamesList().size() == 0) { + omMetadataManager.getUserTable().deleteWithBatch(batchOperation, + dbUserKey); + } else { + omMetadataManager.getUserTable().putWithBatch(batchOperation, dbUserKey, + volumeList); + } + omMetadataManager.getVolumeTable().deleteWithBatch(batchOperation, + omMetadataManager.getVolumeKey(volume)); + } +} + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java new file mode 100644 index 00000000000..d66cac7c021 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/package-info.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.response; + + +/** + * This package contains classes for the OM Responses. + */ \ No newline at end of file diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java new file mode 100644 index 00000000000..a0162e8aa81 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithDummyResponse.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.response.OMClientResponse; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.utils.db.BatchOperation; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + +/** + * This class tests OzoneManagerDoubleBuffer implementation with + * dummy response class. + */ +public class TestOzoneManagerDoubleBufferWithDummyResponse { + + private OMMetadataManager omMetadataManager; + private OzoneManagerDoubleBuffer doubleBuffer; + private AtomicLong trxId = new AtomicLong(0); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() throws IOException { + OzoneConfiguration configuration = new OzoneConfiguration(); + configuration.set(OZONE_METADATA_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = + new OmMetadataManagerImpl(configuration); + doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager); + } + + @After + public void stop() { + doubleBuffer.stop(); + } + + /** + * This tests add's 100 bucket creation responses to doubleBuffer, and + * check OM DB bucket table has 100 entries or not. In addition checks + * flushed transaction count is matching with expected count or not. + * @throws Exception + */ + @Test(timeout = 300_000) + public void testDoubleBufferWithDummyResponse() throws Exception { + String volumeName = UUID.randomUUID().toString(); + int bucketCount = 100; + for (int i=0; i < bucketCount; i++) { + doubleBuffer.add(createDummyBucketResponse(volumeName, + UUID.randomUUID().toString()), trxId.incrementAndGet()); + } + GenericTestUtils.waitFor(() -> + doubleBuffer.getFlushedTransactionCount() == bucketCount, 100, + 60000); + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == (bucketCount)); + Assert.assertTrue(doubleBuffer.getFlushIterations() > 0); + } + + /** + * Create DummyBucketCreate response. + * @param volumeName + * @param bucketName + * @return OMDummyCreateBucketResponse + */ + private OMDummyCreateBucketResponse createDummyBucketResponse( + String volumeName, String bucketName) { + OmBucketInfo omBucketInfo = + OmBucketInfo.newBuilder().setVolumeName(volumeName) + .setBucketName(bucketName).setCreationTime(Time.now()).build(); + return new OMDummyCreateBucketResponse(omBucketInfo); + } + + + /** + * DummyCreatedBucket Response class used in testing. + */ + public static class OMDummyCreateBucketResponse implements OMClientResponse { + private final OmBucketInfo omBucketInfo; + + public OMDummyCreateBucketResponse(OmBucketInfo omBucketInfo) { + this.omBucketInfo = omBucketInfo; + } + + @Override + public void addToDBBatch(OMMetadataManager omMetadataManager, + BatchOperation batchOperation) throws IOException { + String dbBucketKey = + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + omBucketInfo.getBucketName()); + omMetadataManager.getBucketTable().putWithBatch(batchOperation, + dbBucketKey, omBucketInfo); + } + + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java new file mode 100644 index 00000000000..1926b656297 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBufferWithOMResponse.java @@ -0,0 +1,409 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.ratis; + +import java.io.IOException; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; +import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs; +import org.apache.hadoop.ozone.om.response.OMVolumeCreateResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList; +import org.apache.hadoop.ozone.om.response.OMBucketCreateResponse; +import org.apache.hadoop.ozone.om.response.OMBucketDeleteResponse; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; +import static org.junit.Assert.fail; + +/** + * This class tests OzoneManagerDouble Buffer with actual OMResponse classes. + */ +public class TestOzoneManagerDoubleBufferWithOMResponse { + + private OMMetadataManager omMetadataManager; + private OzoneManagerDoubleBuffer doubleBuffer; + private AtomicLong trxId = new AtomicLong(0); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void setup() throws IOException { + OzoneConfiguration configuration = new OzoneConfiguration(); + configuration.set(OZONE_METADATA_DIRS, + folder.newFolder().getAbsolutePath()); + omMetadataManager = + new OmMetadataManagerImpl(configuration); + doubleBuffer = new OzoneManagerDoubleBuffer(omMetadataManager); + } + + @After + public void stop() { + doubleBuffer.stop(); + } + + /** + * This tests OzoneManagerDoubleBuffer implementation. It calls + * testDoubleBuffer with number of iterations to do transactions and + * number of buckets to be created in each iteration. It then + * verifies OM DB entries count is matching with total number of + * transactions or not. + * @throws Exception + */ + @Test(timeout = 300_000) + public void testDoubleBuffer() throws Exception { + // This test checks whether count in tables are correct or not. + testDoubleBuffer(1, 10); + testDoubleBuffer(10, 100); + testDoubleBuffer(100, 100); + testDoubleBuffer(1000, 1000); + } + + /** + * This test first creates a volume, and then does a mix of transactions + * like create/delete buckets and add them to double buffer. Then it + * verifies OM DB entries are matching with actual responses added to + * double buffer or not. + * @throws Exception + */ + @Test + public void testDoubleBufferWithMixOfTransactions() throws Exception { + // This test checks count, data in table is correct or not. + Queue< OMBucketCreateResponse > bucketQueue = + new ConcurrentLinkedQueue<>(); + Queue< OMBucketDeleteResponse > deleteBucketQueue = + new ConcurrentLinkedQueue<>(); + + String volumeName = UUID.randomUUID().toString(); + OMVolumeCreateResponse omVolumeCreateResponse = createVolume(volumeName); + doubleBuffer.add(omVolumeCreateResponse, trxId.incrementAndGet()); + + + int bucketCount = 10; + + doMixTransactions(volumeName, 10, deleteBucketQueue, bucketQueue); + + // As for every 2 transactions of create bucket we add deleted bucket. + final int deleteCount = 5; + + // We are doing +1 for volume transaction. + GenericTestUtils.waitFor(() -> + doubleBuffer.getFlushedTransactionCount() == + (bucketCount + deleteCount + 1), 100, 120000); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getVolumeTable()) == 1); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == 5); + + // Now after this in our DB we should have 5 buckets and one volume + + checkVolume(volumeName, omVolumeCreateResponse); + + checkCreateBuckets(bucketQueue); + + checkDeletedBuckets(deleteBucketQueue); + } + + /** + * This test first creates a volume, and then does a mix of transactions + * like create/delete buckets in parallel and add to double buffer. Then it + * verifies OM DB entries are matching with actual responses added to + * double buffer or not. + * @throws Exception + */ + @Test + public void testDoubleBufferWithMixOfTransactionsParallel() throws Exception { + // This test checks count, data in table is correct or not. + + Queue< OMBucketCreateResponse > bucketQueue = + new ConcurrentLinkedQueue<>(); + Queue< OMBucketDeleteResponse > deleteBucketQueue = + new ConcurrentLinkedQueue<>(); + + String volumeName1 = UUID.randomUUID().toString(); + OMVolumeCreateResponse omVolumeCreateResponse1 = + createVolume(volumeName1); + + String volumeName2 = UUID.randomUUID().toString(); + OMVolumeCreateResponse omVolumeCreateResponse2 = + createVolume(volumeName2); + + doubleBuffer.add(omVolumeCreateResponse1, trxId.incrementAndGet()); + + doubleBuffer.add(omVolumeCreateResponse2, trxId.incrementAndGet()); + + Daemon daemon1 = new Daemon(() -> doMixTransactions(volumeName1, 10, + deleteBucketQueue, bucketQueue)); + Daemon daemon2 = new Daemon(() -> doMixTransactions(volumeName2, 10, + deleteBucketQueue, bucketQueue)); + + daemon1.start(); + daemon2.start(); + + int bucketCount = 20; + + // As for every 2 transactions of create bucket we add deleted bucket. + final int deleteCount = 10; + + // We are doing +1 for volume transaction. + GenericTestUtils.waitFor(() -> doubleBuffer.getFlushedTransactionCount() + == (bucketCount + deleteCount + 2), 100, 120000); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getVolumeTable()) == 2); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == 10); + + // Now after this in our DB we should have 5 buckets and one volume + + + checkVolume(volumeName1, omVolumeCreateResponse1); + checkVolume(volumeName2, omVolumeCreateResponse2); + + checkCreateBuckets(bucketQueue); + + checkDeletedBuckets(deleteBucketQueue); + } + + /** + * This method add's a mix of createBucket/DeleteBucket responses to double + * buffer. Total number of responses added is specified by bucketCount. + * @param volumeName + * @param bucketCount + * @param deleteBucketQueue + * @param bucketQueue + */ + private void doMixTransactions(String volumeName, int bucketCount, + Queue deleteBucketQueue, + Queue bucketQueue) { + for (int i=0; i < bucketCount; i++) { + String bucketName = UUID.randomUUID().toString(); + OMBucketCreateResponse omBucketCreateResponse = createBucket(volumeName, + bucketName); + doubleBuffer.add(omBucketCreateResponse, trxId.incrementAndGet()); + // For every 2 transactions have a deleted bucket. + if (i % 2 == 0) { + OMBucketDeleteResponse omBucketDeleteResponse = + deleteBucket(volumeName, bucketName); + doubleBuffer.add(omBucketDeleteResponse, trxId.incrementAndGet()); + deleteBucketQueue.add(omBucketDeleteResponse); + } else { + bucketQueue.add(omBucketCreateResponse); + } + } + } + + /** + * Verifies volume table data is matching with actual response added to + * double buffer. + * @param volumeName + * @param omVolumeCreateResponse + * @throws Exception + */ + private void checkVolume(String volumeName, + OMVolumeCreateResponse omVolumeCreateResponse) throws Exception { + OmVolumeArgs tableVolumeArgs = omMetadataManager.getVolumeTable().get( + omMetadataManager.getVolumeKey(volumeName)); + Assert.assertTrue(tableVolumeArgs != null); + + OmVolumeArgs omVolumeArgs = omVolumeCreateResponse.getOmVolumeArgs(); + + Assert.assertEquals(omVolumeArgs.getVolume(), tableVolumeArgs.getVolume()); + Assert.assertEquals(omVolumeArgs.getAdminName(), + tableVolumeArgs.getAdminName()); + Assert.assertEquals(omVolumeArgs.getOwnerName(), + tableVolumeArgs.getOwnerName()); + Assert.assertEquals(omVolumeArgs.getCreationTime(), + tableVolumeArgs.getCreationTime()); + } + + /** + * Verifies bucket table data is matching with actual response added to + * double buffer. + * @param bucketQueue + */ + private void checkCreateBuckets(Queue bucketQueue) { + bucketQueue.forEach((omBucketCreateResponse) -> { + OmBucketInfo omBucketInfo = omBucketCreateResponse.getOmBucketInfo(); + String bucket = omBucketInfo.getBucketName(); + OmBucketInfo tableBucketInfo = null; + try { + tableBucketInfo = + omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey(omBucketInfo.getVolumeName(), + bucket)); + } catch (IOException ex) { + fail("testDoubleBufferWithMixOfTransactions failed"); + } + Assert.assertNotNull(tableBucketInfo); + + Assert.assertEquals(omBucketInfo.getVolumeName(), + tableBucketInfo.getVolumeName()); + Assert.assertEquals(omBucketInfo.getBucketName(), + tableBucketInfo.getBucketName()); + Assert.assertEquals(omBucketInfo.getCreationTime(), + tableBucketInfo.getCreationTime()); + }); + } + + /** + * Verifies deleted bucket responses added to double buffer are actually + * removed from the OM DB or not. + * @param deleteBucketQueue + */ + private void checkDeletedBuckets(Queue + deleteBucketQueue) { + deleteBucketQueue.forEach((omBucketDeleteResponse -> { + try { + Assert.assertNull(omMetadataManager.getBucketTable().get( + omMetadataManager.getBucketKey( + omBucketDeleteResponse.getVolumeName(), + omBucketDeleteResponse.getBucketName()))); + } catch (IOException ex) { + fail("testDoubleBufferWithMixOfTransactions failed"); + } + })); + } + + /** + * Create bucketCount number of createBucket responses for each iteration. + * All these iterations are run in parallel. Then verify OM DB has correct + * number of entries or not. + * @param iterations + * @param bucketCount + * @throws Exception + */ + public void testDoubleBuffer(int iterations, int bucketCount) + throws Exception { + try { + // Calling setup and stop here because this method is called from a + // single test multiple times. + setup(); + for (int i = 0; i < iterations; i++) { + Daemon d1 = new Daemon(() -> + doTransactions(UUID.randomUUID().toString(), bucketCount)); + d1.start(); + } + + // We are doing +1 for volume transaction. + GenericTestUtils.waitFor(() -> + doubleBuffer.getFlushedTransactionCount() == + (bucketCount + 1) * iterations, 100, + 120000); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getVolumeTable()) == iterations); + + Assert.assertTrue(omMetadataManager.countRowsInTable( + omMetadataManager.getBucketTable()) == (bucketCount) * iterations); + + Assert.assertTrue(doubleBuffer.getFlushIterations() > 0); + } finally { + stop(); + } + } + + /** + * This method adds bucketCount number of createBucket responses to double + * buffer. + * @param volumeName + * @param bucketCount + */ + public void doTransactions(String volumeName, int bucketCount) { + doubleBuffer.add(createVolume(volumeName), trxId.incrementAndGet()); + for (int i=0; i< bucketCount; i++) { + doubleBuffer.add(createBucket(volumeName, UUID.randomUUID().toString()), + trxId.incrementAndGet()); + // For every 100 buckets creation adding 100ms delay + + if (i % 100 == 0) { + try { + Thread.sleep(100); + } catch (Exception ex) { + + } + } + } + } + + /** + * Create OMVolumeCreateResponse for specified volume. + * @param volumeName + * @return OMVolumeCreateResponse + */ + private OMVolumeCreateResponse createVolume(String volumeName) { + OmVolumeArgs omVolumeArgs = + OmVolumeArgs.newBuilder() + .setAdminName(UUID.randomUUID().toString()) + .setOwnerName(UUID.randomUUID().toString()) + .setVolume(volumeName) + .setCreationTime(Time.now()).build(); + + VolumeList volumeList = VolumeList.newBuilder() + .addVolumeNames(volumeName).build(); + return new OMVolumeCreateResponse(omVolumeArgs, volumeList); + } + + /** + * Create OMBucketCreateResponse for specified volume and bucket. + * @param volumeName + * @param bucketName + * @return OMBucketCreateResponse + */ + private OMBucketCreateResponse createBucket(String volumeName, + String bucketName) { + OmBucketInfo omBucketInfo = + OmBucketInfo.newBuilder().setVolumeName(volumeName) + .setBucketName(bucketName).setCreationTime(Time.now()).build(); + return new OMBucketCreateResponse(omBucketInfo); + } + + /** + * Create OMBucketDeleteResponse for specified volume and bucket. + * @param volumeName + * @param bucketName + * @return OMBucketDeleteResponse + */ + private OMBucketDeleteResponse deleteBucket(String volumeName, + String bucketName) { + return new OMBucketDeleteResponse(volumeName, bucketName); + } + + +} +