HDFS-12519. Ozone: Lease Manager framework. Contributed by Nandakumar.

This commit is contained in:
Nandakumar 2017-10-12 19:12:02 +05:30 committed by Owen O'Malley
parent ebc4d4ebc3
commit 8881309002
11 changed files with 1149 additions and 0 deletions

View File

@ -0,0 +1,189 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
import org.apache.hadoop.util.Time;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
/**
* This class represents the lease created on a resource. Callback can be
* registered on the lease which will be executed in case of timeout.
*
* @param <T> Resource type for which the lease can be associated
*/
public class Lease<T> {
/**
* The resource for which this lease is created.
*/
private final T resource;
private final long creationTime;
/**
* Lease lifetime in milliseconds.
*/
private volatile long leaseTimeout;
private boolean expired;
/**
* Functions to be called in case of timeout.
*/
private List<Callable<Void>> callbacks;
/**
* Creates a lease on the specified resource with given timeout.
*
* @param resource
* Resource for which the lease has to be created
* @param timeout
* Lease lifetime in milliseconds
*/
public Lease(T resource, long timeout) {
this.resource = resource;
this.leaseTimeout = timeout;
this.callbacks = Collections.synchronizedList(new ArrayList<>());
this.creationTime = Time.monotonicNow();
this.expired = false;
}
/**
* Returns true if the lease has expired, else false.
*
* @return true if expired, else false
*/
public boolean hasExpired() {
return expired;
}
/**
* Registers a callback which will be executed in case of timeout. Callbacks
* are executed in a separate Thread.
*
* @param callback
* The Callable which has to be executed
* @throws LeaseExpiredException
* If the lease has already timed out
*/
public void registerCallBack(Callable<Void> callback)
throws LeaseExpiredException {
if(hasExpired()) {
throw new LeaseExpiredException("Resource: " + resource);
}
callbacks.add(callback);
}
/**
* Returns the time elapsed since the creation of lease.
*
* @return elapsed time in milliseconds
* @throws LeaseExpiredException
* If the lease has already timed out
*/
public long getElapsedTime() throws LeaseExpiredException {
if(hasExpired()) {
throw new LeaseExpiredException("Resource: " + resource);
}
return Time.monotonicNow() - creationTime;
}
/**
* Returns the time available before timeout.
*
* @return remaining time in milliseconds
* @throws LeaseExpiredException
* If the lease has already timed out
*/
public long getRemainingTime() throws LeaseExpiredException {
if(hasExpired()) {
throw new LeaseExpiredException("Resource: " + resource);
}
return leaseTimeout - getElapsedTime();
}
/**
* Returns total lease lifetime.
*
* @return total lifetime of lease in milliseconds
* @throws LeaseExpiredException
* If the lease has already timed out
*/
public long getLeaseLifeTime() throws LeaseExpiredException {
if(hasExpired()) {
throw new LeaseExpiredException("Resource: " + resource);
}
return leaseTimeout;
}
/**
* Renews the lease timeout period.
*
* @param timeout
* Time to be added to the lease in milliseconds
* @throws LeaseExpiredException
* If the lease has already timed out
*/
public void renew(long timeout) throws LeaseExpiredException {
if(hasExpired()) {
throw new LeaseExpiredException("Resource: " + resource);
}
leaseTimeout += timeout;
}
@Override
public int hashCode() {
return resource.hashCode();
}
@Override
public boolean equals(Object obj) {
if(obj instanceof Lease) {
return resource.equals(((Lease) obj).resource);
}
return false;
}
@Override
public String toString() {
return "Lease<" + resource.toString() + ">";
}
/**
* Returns the callbacks to be executed for the lease in case of timeout.
*
* @return callbacks to be executed
*/
List<Callable<Void>> getCallbacks() {
return callbacks;
}
/**
* Expires/Invalidates the lease.
*/
void invalidate() {
callbacks = null;
expired = true;
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
/**
* This exception represents that there is already a lease acquired on the
* same resource.
*/
public class LeaseAlreadyExistException extends LeaseException {
/**
* Constructs an {@code LeaseAlreadyExistException} with {@code null}
* as its error detail message.
*/
public LeaseAlreadyExistException() {
super();
}
/**
* Constructs an {@code LeaseAlreadyExistException} with the specified
* detail message.
*
* @param message
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public LeaseAlreadyExistException(String message) {
super(message);
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.Callable;
/**
* This class is responsible for executing the callbacks of a lease in case of
* timeout.
*/
public class LeaseCallbackExecutor<T> implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(Lease.class);
private final T resource;
private final List<Callable<Void>> callbacks;
/**
* Constructs LeaseCallbackExecutor instance with list of callbacks.
*
* @param resource
* The resource for which the callbacks are executed
* @param callbacks
* Callbacks to be executed by this executor
*/
public LeaseCallbackExecutor(T resource, List<Callable<Void>> callbacks) {
this.resource = resource;
this.callbacks = callbacks;
}
@Override
public void run() {
if(LOG.isDebugEnabled()) {
LOG.debug("Executing callbacks for lease on {}", resource);
}
for(Callable<Void> callback : callbacks) {
try {
callback.call();
} catch (Exception e) {
LOG.warn("Exception while executing callback for lease on {}",
resource, e);
}
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
/**
* This exception represents all lease related exceptions.
*/
public class LeaseException extends Exception {
/**
* Constructs an {@code LeaseException} with {@code null}
* as its error detail message.
*/
public LeaseException() {
super();
}
/**
* Constructs an {@code LeaseException} with the specified
* detail message.
*
* @param message
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public LeaseException(String message) {
super(message);
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
/**
* This exception represents that the lease that is being accessed has expired.
*/
public class LeaseExpiredException extends LeaseException {
/**
* Constructs an {@code LeaseExpiredException} with {@code null}
* as its error detail message.
*/
public LeaseExpiredException() {
super();
}
/**
* Constructs an {@code LeaseExpiredException} with the specified
* detail message.
*
* @param message
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public LeaseExpiredException(String message) {
super(message);
}
}

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* LeaseManager is someone who can provide you leases based on your
* requirement. If you want to return the lease back before it expires,
* you can give it back to Lease Manager. He is the one responsible for
* the lifecycle of leases. The resource for which lease is created
* should have proper {@code equals} method implementation, resource
* equality is checked while the lease is created.
*
* @param <T> Type of leases that this lease manager can create
*/
public class LeaseManager<T> {
private static final Logger LOG =
LoggerFactory.getLogger(LeaseManager.class);
private final long defaultTimeout;
private Map<T, Lease<T>> activeLeases;
private LeaseMonitor leaseMonitor;
private Thread leaseMonitorThread;
private boolean isRunning;
/**
* Creates an instance of lease manager.
*
* @param defaultTimeout
* Default timeout value to be used for lease creation.
*/
public LeaseManager(long defaultTimeout) {
this.defaultTimeout = defaultTimeout;
}
/**
* Starts the lease manager service.
*/
public void start() {
LOG.debug("Starting LeaseManager service");
activeLeases = new ConcurrentHashMap<>();
leaseMonitor = new LeaseMonitor();
leaseMonitorThread = new Thread(leaseMonitor);
leaseMonitorThread.setName("LeaseManager#LeaseMonitor");
leaseMonitorThread.setDaemon(true);
leaseMonitorThread.setUncaughtExceptionHandler((thread, throwable) -> {
// Let us just restart this thread after logging an error.
// if this thread is not running we cannot handle Lease expiry.
LOG.error("LeaseMonitor thread encountered an error. Thread: {}",
thread.toString(), throwable);
leaseMonitorThread.start();
});
LOG.debug("Starting LeaseManager#LeaseMonitor Thread");
leaseMonitorThread.start();
isRunning = true;
}
/**
* Returns a lease for the specified resource with default timeout.
*
* @param resource
* Resource for which lease has to be created
* @throws LeaseAlreadyExistException
* If there is already a lease on the resource
*/
public synchronized Lease<T> acquire(T resource)
throws LeaseAlreadyExistException {
return acquire(resource, defaultTimeout);
}
/**
* Returns a lease for the specified resource with the timeout provided.
*
* @param resource
* Resource for which lease has to be created
* @param timeout
* The timeout in milliseconds which has to be set on the lease
* @throws LeaseAlreadyExistException
* If there is already a lease on the resource
*/
public synchronized Lease<T> acquire(T resource, long timeout)
throws LeaseAlreadyExistException {
checkStatus();
if(LOG.isDebugEnabled()) {
LOG.debug("Acquiring lease on {} for {} milliseconds", resource, timeout);
}
if(activeLeases.containsKey(resource)) {
throw new LeaseAlreadyExistException("Resource: " + resource);
}
Lease<T> lease = new Lease<>(resource, timeout);
activeLeases.put(resource, lease);
leaseMonitorThread.interrupt();
return lease;
}
/**
* Returns a lease associated with the specified resource.
*
* @param resource
* Resource for which the lease has to be returned
* @throws LeaseNotFoundException
* If there is no active lease on the resource
*/
public Lease<T> get(T resource) throws LeaseNotFoundException {
checkStatus();
Lease<T> lease = activeLeases.get(resource);
if(lease != null) {
return lease;
}
throw new LeaseNotFoundException("Resource: " + resource);
}
/**
* Releases the lease associated with the specified resource.
*
* @param resource
* The for which the lease has to be released
* @throws LeaseNotFoundException
* If there is no active lease on the resource
*/
public synchronized void release(T resource)
throws LeaseNotFoundException {
checkStatus();
if(LOG.isDebugEnabled()) {
LOG.debug("Releasing lease on {}", resource);
}
Lease<T> lease = activeLeases.remove(resource);
if(lease == null) {
throw new LeaseNotFoundException("Resource: " + resource);
}
lease.invalidate();
}
/**
* Shuts down the LeaseManager and releases the resources. All the active
* {@link Lease} will be released (callbacks on leases will not be
* executed).
*/
public void shutdown() {
checkStatus();
LOG.debug("Shutting down LeaseManager service");
leaseMonitor.disable();
leaseMonitorThread.interrupt();
for(T resource : activeLeases.keySet()) {
try {
release(resource);
} catch(LeaseNotFoundException ex) {
//Ignore the exception, someone might have released the lease
}
}
isRunning = false;
}
/**
* Throws {@link LeaseManagerNotRunningException} if the service is not
* running.
*/
private void checkStatus() {
if(!isRunning) {
throw new LeaseManagerNotRunningException("LeaseManager not running.");
}
}
/**
* Monitors the leases and expires them based on the timeout, also
* responsible for executing the callbacks of expired leases.
*/
private final class LeaseMonitor implements Runnable {
private boolean monitor = true;
private ExecutorService executorService;
private LeaseMonitor() {
this.monitor = true;
this.executorService = Executors.newCachedThreadPool();
}
@Override
public void run() {
while(monitor) {
LOG.debug("LeaseMonitor: checking for lease expiry");
long sleepTime = Long.MAX_VALUE;
for (T resource : activeLeases.keySet()) {
try {
Lease<T> lease = get(resource);
long remainingTime = lease.getRemainingTime();
if (remainingTime <= 0) {
//Lease has timed out
List<Callable<Void>> leaseCallbacks = lease.getCallbacks();
release(resource);
executorService.execute(
new LeaseCallbackExecutor(resource, leaseCallbacks));
} else {
sleepTime = remainingTime > sleepTime ?
sleepTime : remainingTime;
}
} catch (LeaseNotFoundException | LeaseExpiredException ex) {
//Ignore the exception, someone might have released the lease
}
}
try {
if(!Thread.interrupted()) {
Thread.sleep(sleepTime);
}
} catch (InterruptedException ignored) {
// This means a new lease is added to activeLeases.
}
}
}
/**
* Disables lease monitor, next interrupt call on the thread
* will stop lease monitor.
*/
public void disable() {
monitor = false;
}
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
/**
* This exception represents that there LeaseManager service is not running.
*/
public class LeaseManagerNotRunningException extends RuntimeException {
/**
* Constructs an {@code LeaseManagerNotRunningException} with {@code null}
* as its error detail message.
*/
public LeaseManagerNotRunningException() {
super();
}
/**
* Constructs an {@code LeaseManagerNotRunningException} with the specified
* detail message.
*
* @param message
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public LeaseManagerNotRunningException(String message) {
super(message);
}
}

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
/**
* This exception represents that the lease that is being accessed does not
* exist.
*/
public class LeaseNotFoundException extends LeaseException {
/**
* Constructs an {@code LeaseNotFoundException} with {@code null}
* as its error detail message.
*/
public LeaseNotFoundException() {
super();
}
/**
* Constructs an {@code LeaseNotFoundException} with the specified
* detail message.
*
* @param message
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public LeaseNotFoundException(String message) {
super(message);
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* A generic lease management API which can be used if a service
* needs any kind of lease management.
*/
package org.apache.hadoop.ozone.lease;
/*
This package contains lease management related classes.
*/

View File

@ -0,0 +1,374 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* A generic lease management API which can be used if a service
* needs any kind of lease management.
*/
package org.apache.hadoop.ozone.lease;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap;
import java.util.Map;
/**
* Test class to check functionality and consistency of LeaseManager.
*/
public class TestLeaseManager {
@Rule
public ExpectedException exception = ExpectedException.none();
/**
* Dummy resource on which leases can be acquired.
*/
private final class DummyResource {
private final String name;
private DummyResource(String name) {
this.name = name;
}
@Override
public int hashCode() {
return name.hashCode();
}
@Override
public boolean equals(Object obj) {
if(obj instanceof DummyResource) {
return name.equals(((DummyResource) obj).name);
}
return false;
}
}
@Test
public void testLeaseAcquireAndRelease() throws LeaseException {
//It is assumed that the test case execution won't take more than 5 seconds,
//if it takes more time increase the defaultTimeout value of LeaseManager.
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
DummyResource resourceTwo = new DummyResource("two");
DummyResource resourceThree = new DummyResource("three");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
Assert.assertEquals(leaseThree, manager.get(resourceThree));
Assert.assertFalse(leaseOne.hasExpired());
Assert.assertFalse(leaseTwo.hasExpired());
Assert.assertFalse(leaseThree.hasExpired());
//The below releases should not throw LeaseNotFoundException.
manager.release(resourceOne);
manager.release(resourceTwo);
manager.release(resourceThree);
Assert.assertTrue(leaseOne.hasExpired());
Assert.assertTrue(leaseTwo.hasExpired());
Assert.assertTrue(leaseThree.hasExpired());
manager.shutdown();
}
@Test
public void testLeaseAlreadyExist() throws LeaseException {
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
DummyResource resourceTwo = new DummyResource("two");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
exception.expect(LeaseAlreadyExistException.class);
exception.expectMessage("Resource: " + resourceOne);
manager.acquire(resourceOne);
manager.release(resourceOne);
manager.release(resourceTwo);
manager.shutdown();
}
@Test
public void testLeaseNotFound() throws LeaseException, InterruptedException {
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
DummyResource resourceTwo = new DummyResource("two");
DummyResource resourceThree = new DummyResource("three");
//Case 1: lease was never acquired.
exception.expect(LeaseNotFoundException.class);
exception.expectMessage("Resource: " + resourceOne);
manager.get(resourceOne);
//Case 2: lease is acquired and released.
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
Assert.assertFalse(leaseTwo.hasExpired());
manager.release(resourceTwo);
Assert.assertTrue(leaseTwo.hasExpired());
exception.expect(LeaseNotFoundException.class);
exception.expectMessage("Resource: " + resourceTwo);
manager.get(resourceTwo);
//Case 3: lease acquired and timed out.
Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
Assert.assertEquals(leaseThree, manager.get(resourceThree));
Assert.assertFalse(leaseThree.hasExpired());
long sleepTime = leaseThree.getRemainingTime() + 5;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
//even in case of interrupt we have to wait till lease times out.
Thread.sleep(sleepTime);
}
Assert.assertTrue(leaseThree.hasExpired());
exception.expect(LeaseNotFoundException.class);
exception.expectMessage("Resource: " + resourceThree);
manager.get(resourceThree);
manager.shutdown();
}
@Test
public void testCustomLeaseTimeout() throws LeaseException {
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
DummyResource resourceTwo = new DummyResource("two");
DummyResource resourceThree = new DummyResource("three");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo, 10000);
Lease<DummyResource> leaseThree = manager.acquire(resourceThree, 50000);
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
Assert.assertEquals(leaseThree, manager.get(resourceThree));
Assert.assertFalse(leaseOne.hasExpired());
Assert.assertFalse(leaseTwo.hasExpired());
Assert.assertFalse(leaseThree.hasExpired());
Assert.assertEquals(5000, leaseOne.getLeaseLifeTime());
Assert.assertEquals(10000, leaseTwo.getLeaseLifeTime());
Assert.assertEquals(50000, leaseThree.getLeaseLifeTime());
// Releasing of leases is done in shutdown, so don't have to worry about
// lease release
manager.shutdown();
}
@Test
public void testLeaseCallback() throws LeaseException, InterruptedException {
Map<DummyResource, String> leaseStatus = new HashMap<>();
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
leaseStatus.put(resourceOne, "lease in use");
leaseOne.registerCallBack(() -> {
leaseStatus.put(resourceOne, "lease expired");
return null;
});
// wait for lease to expire
long sleepTime = leaseOne.getRemainingTime() + 5;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
//even in case of interrupt we have to wait till lease times out.
Thread.sleep(sleepTime);
}
Assert.assertTrue(leaseOne.hasExpired());
exception.expect(LeaseNotFoundException.class);
exception.expectMessage("Resource: " + resourceOne);
manager.get(resourceOne);
// check if callback has been executed
Assert.assertEquals("lease expired", leaseStatus.get(resourceOne));
}
@Test
public void testCallbackExecutionInCaseOfLeaseRelease()
throws LeaseException, InterruptedException {
// Callbacks should not be executed in case of lease release
Map<DummyResource, String> leaseStatus = new HashMap<>();
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
leaseStatus.put(resourceOne, "lease in use");
leaseOne.registerCallBack(() -> {
leaseStatus.put(resourceOne, "lease expired");
return null;
});
leaseStatus.put(resourceOne, "lease released");
manager.release(resourceOne);
Assert.assertTrue(leaseOne.hasExpired());
exception.expect(LeaseNotFoundException.class);
exception.expectMessage("Resource: " + resourceOne);
manager.get(resourceOne);
Assert.assertEquals("lease released", leaseStatus.get(resourceOne));
}
@Test
public void testLeaseCallbackWithMultipleLeases()
throws LeaseException, InterruptedException {
Map<DummyResource, String> leaseStatus = new HashMap<>();
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
DummyResource resourceTwo = new DummyResource("two");
DummyResource resourceThree = new DummyResource("three");
DummyResource resourceFour = new DummyResource("four");
DummyResource resourceFive = new DummyResource("five");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
Lease<DummyResource> leaseFour = manager.acquire(resourceFour);
Lease<DummyResource> leaseFive = manager.acquire(resourceFive);
leaseStatus.put(resourceOne, "lease in use");
leaseStatus.put(resourceTwo, "lease in use");
leaseStatus.put(resourceThree, "lease in use");
leaseStatus.put(resourceFour, "lease in use");
leaseStatus.put(resourceFive, "lease in use");
leaseOne.registerCallBack(() -> {
leaseStatus.put(resourceOne, "lease expired");
return null;
});
leaseTwo.registerCallBack(() -> {
leaseStatus.put(resourceTwo, "lease expired");
return null;
});
leaseThree.registerCallBack(() -> {
leaseStatus.put(resourceThree, "lease expired");
return null;
});
leaseFour.registerCallBack(() -> {
leaseStatus.put(resourceFour, "lease expired");
return null;
});
leaseFive.registerCallBack(() -> {
leaseStatus.put(resourceFive, "lease expired");
return null;
});
// release lease one, two and three
leaseStatus.put(resourceOne, "lease released");
manager.release(resourceOne);
leaseStatus.put(resourceTwo, "lease released");
manager.release(resourceTwo);
leaseStatus.put(resourceThree, "lease released");
manager.release(resourceThree);
// wait for other leases to expire
long sleepTime = leaseFive.getRemainingTime() + 10;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
//even in case of interrupt we have to wait till lease times out.
Thread.sleep(sleepTime);
}
Assert.assertTrue(leaseOne.hasExpired());
Assert.assertTrue(leaseTwo.hasExpired());
Assert.assertTrue(leaseThree.hasExpired());
Assert.assertTrue(leaseFour.hasExpired());
Assert.assertTrue(leaseFive.hasExpired());
Assert.assertEquals("lease released", leaseStatus.get(resourceOne));
Assert.assertEquals("lease released", leaseStatus.get(resourceTwo));
Assert.assertEquals("lease released", leaseStatus.get(resourceThree));
Assert.assertEquals("lease expired", leaseStatus.get(resourceFour));
Assert.assertEquals("lease expired", leaseStatus.get(resourceFive));
manager.shutdown();
}
@Test
public void testReuseReleasedLease() throws LeaseException {
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertFalse(leaseOne.hasExpired());
manager.release(resourceOne);
Assert.assertTrue(leaseOne.hasExpired());
Lease<DummyResource> sameResourceLease = manager.acquire(resourceOne);
Assert.assertEquals(sameResourceLease, manager.get(resourceOne));
Assert.assertFalse(sameResourceLease.hasExpired());
manager.release(resourceOne);
Assert.assertTrue(sameResourceLease.hasExpired());
manager.shutdown();
}
@Test
public void testReuseTimedOutLease()
throws LeaseException, InterruptedException {
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertFalse(leaseOne.hasExpired());
// wait for lease to expire
long sleepTime = leaseOne.getRemainingTime() + 5;
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ex) {
//even in case of interrupt we have to wait till lease times out.
Thread.sleep(sleepTime);
}
Assert.assertTrue(leaseOne.hasExpired());
Lease<DummyResource> sameResourceLease = manager.acquire(resourceOne);
Assert.assertEquals(sameResourceLease, manager.get(resourceOne));
Assert.assertFalse(sameResourceLease.hasExpired());
manager.release(resourceOne);
Assert.assertTrue(sameResourceLease.hasExpired());
manager.shutdown();
}
@Test
public void testRenewLease() throws LeaseException, InterruptedException {
LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
manager.start();
DummyResource resourceOne = new DummyResource("one");
Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertFalse(leaseOne.hasExpired());
// add 5 more seconds to the lease
leaseOne.renew(5000);
Thread.sleep(5000);
// lease should still be active
Assert.assertEquals(leaseOne, manager.get(resourceOne));
Assert.assertFalse(leaseOne.hasExpired());
manager.release(resourceOne);
manager.shutdown();
}
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.lease;
/*
This package contains lease management unit test classes.
*/