semaphores (#2198)

* minor logging fix

* spring security sso

* use basic auth

* use form login

* cleanup

* cleanup

* final cleanup

* second client app for sso

* spring boot bootstrap

* add logic

* cleanup

* add simple controller

* add thymeleaf and security

* minor fix

* minor fix

* add more boot properties

* fix live test

* fix live test

* minor fix

* semaphores
This commit is contained in:
Doha2012 2017-07-03 17:34:41 +02:00 committed by Grzegorz Piwowarek
parent 7c6a08746e
commit 20669dc6a1
4 changed files with 219 additions and 0 deletions

View File

@ -0,0 +1,31 @@
package com.baeldung.concurrent.semaphores;
import java.util.concurrent.Semaphore;
public class CounterUsingMutex {
private final Semaphore mutex;
private int count;
public CounterUsingMutex() {
mutex = new Semaphore(1);
count = 0;
}
public void increase() throws InterruptedException {
mutex.acquire();
this.count = this.count + 1;
Thread.sleep(1000);
mutex.release();
}
public int getCount() {
return this.count;
}
public boolean hasQueuedThreads() {
return mutex.hasQueuedThreads();
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.concurrent.semaphores;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.TimedSemaphore;
public class DelayQueueUsingTimedSemaphore {
private final TimedSemaphore semaphore;
public DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}
public boolean tryAdd() {
return semaphore.tryAcquire();
}
public int availableSlots() {
return semaphore.getAvailablePermits();
}
}

View File

@ -0,0 +1,25 @@
package com.baeldung.concurrent.semaphores;
import java.util.concurrent.Semaphore;
public class LoginQueueUsingSemaphore {
private final Semaphore semaphore;
public LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}
public boolean tryLogin() {
return semaphore.tryAcquire();
}
public void logout() {
semaphore.release();
}
public int availableSlots() {
return semaphore.availablePermits();
}
}

View File

@ -0,0 +1,140 @@
package com.baeldung.concurrent.semaphores;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
import org.junit.Test;
public class SemaphoresManualTest {
// ========= login queue ======
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
final int slots = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(new Runnable() {
@Override
public void run() {
loginQueue.tryLogin();
}
}));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
final int slots = 10;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(new Runnable() {
@Override
public void run() {
loginQueue.tryLogin();
}
}));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();
assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}
// ========= delay queue =======
@Test
public void givenDelayQueue_whenReachLimit_thenBlocked() {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(new Runnable() {
@Override
public void run() {
delayQueue.tryAdd();
}
}));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
final int slots = 50;
final ExecutorService executorService = Executors.newFixedThreadPool(slots);
final DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(new Runnable() {
@Override
public void run() {
delayQueue.tryAdd();
}
}));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
// ========== mutex ========
@Test
public void whenMutexAndMultipleThreads_thenBlocked() throws InterruptedException {
final int count = 5;
final ExecutorService executorService = Executors.newFixedThreadPool(count);
final CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(new Runnable() {
@Override
public void run() {
try {
counter.increase();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
}
@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount() throws InterruptedException {
final int count = 5;
final ExecutorService executorService = Executors.newFixedThreadPool(count);
final CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(new Runnable() {
@Override
public void run() {
try {
counter.increase();
} catch (final InterruptedException e) {
e.printStackTrace();
}
}
}));
executorService.shutdown();
assertTrue(counter.hasQueuedThreads());
Thread.sleep(5000);
assertFalse(counter.hasQueuedThreads());
assertEquals(count, counter.getCount());
}
}