Hi folks ,
In this example , I will be talking about three important synchronization classes CountdownLatch , CyclicBarrier and Exchanger with examples and when they are used .
-> CountdownLatch [CDL]
This allows one or more threads to wait until a set of operations being performed in other threads completes. e.g. if a thread has to wait on other dependent threads that have started , to finish .
A CDL is initialized with a given count. Each thread on whose completion the other thread is waiting , should call the countDown() method and decrease the count . Once all such threads have completed execution , the count becomes 0 and the waiting thread can proceed to perform its actions .
The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset.
A CDL initialized with a count of one serves as a simple on/off latch or gate: all threads invoking await wait at the gate until it is opened by a thread invoking countDown().
A CDL initialized to N can be used to make one thread wait until N threads have completed some action, or some action has been completed N times.
Exercise :
The below exercise show five threads running , Main thread , worker 1 , worker 2 , waiter 1 and waiter 2 . It creates a countDownLatch of 2 counts , passes it to all the threads . The Main , Waiter 1 and Waiter 2 threads are waiting on Worker 1 and Worker 2 threads to finish , and once the worker threads finish execution , the waiter threads and the main thread are back to execution .
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.CountDownLatch;
public class Controller {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);
new Thread(new Worker1(latch) , "Worker 1").start();
new Thread(new Worker2(latch), "Worker 2").start();
new Thread(new Waiter1(latch) , "Waiter 1").start();
new Thread(new Waiter2(latch), "Waiter 2").start();
try {
System.out.println(Thread.currentThread().getName() + " is waiting");
latch.await();
System.out.println(Thread.currentThread().getName() + " is back to execution");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.CountDownLatch;
public class Worker1 implements Runnable {
private CountDownLatch latch;
public Worker1(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker 1 finished");
latch.countDown();
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.CountDownLatch;
public class Worker2 implements Runnable {
private CountDownLatch latch;
public Worker2(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker 2 finished");
latch.countDown();
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.CountDownLatch;
public class Waiter1 implements Runnable {
private CountDownLatch latch;
public Waiter1(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is waiting");
latch.await();
System.out.println(Thread.currentThread().getName() + " is back to execution");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter 1 finished");
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.CountDownLatch;
public class Waiter2 implements Runnable {
private CountDownLatch latch;
public Waiter2(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " is waiting");
latch.await();
System.out.println(Thread.currentThread().getName() + " is back to execution");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter 2 finished");
}
}
-------------------------------------------------------------------------
Console Output :
Waiter 1 is waiting
main is waiting
Waiter 2 is waiting
3 seconds Gap
Worker 1 finished
Worker 2 finished
Waiter 1 is back to execution
Waiter 1 finished
main is back to execution
Waiter 2 is back to execution
Waiter 2 finished
Please run this program and see for yourself how CountDownLatch works.
-> CyclicBarrier [CB]
CountdownLatch is a one-shot phenomenon -- the count cannot be reset. If you want to reset the count, use a CyclicBarrier.
A CB allows a set of threads to all wait for each other to reach a common barrier point. CB's are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. It means that all the threads reaching the barrier point will have to wait for the other threads to reach the same point. As soon as all the threads reach this point , they are released to continue .
We can reuse CB by calling reset() method which resets the barrier to its initial state and hence its different from a CDL .
Used in one time events like application / module startup , waiting for individual threads to finish and then resume , used in multiplayer gaming systems where the main thread should wait till all the players join the game .
Exercise :
The below exercise shows 3 players multi player game started from a gamin console , the gaming console is the main thread and the three players are three independent threads , each takes their own time to join the game , once a player joins the game it waits for the other players to join the game , so does the gaming console . Once all the players have joined , all the threads waiting on the barrier are released and the game starts .
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class GamingConsole {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(4);
new Thread(new Player1(barrier), "Player 1").start();
new Thread(new Player2(barrier), "Player 2").start();
new Thread(new Player3(barrier), "Player 3").start();
try {
System.out.println(Thread.currentThread().getName()
+ "Gaming Console is waiting for Players to join");
try {
barrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()
+ "All players have joined : Let's start");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Player1 implements Runnable {
private CyclicBarrier barrier;
public Player1(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Player 1 taking 5 seconds to join the game");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Player 1 Joined the game : Waiting for others");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Player 1 plays");
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Player2 implements Runnable {
private CyclicBarrier barrier;
public Player2(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Player 2 8 seconds to join the game");
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Player 2 Joined the game : Waiting for others");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Player 2 plays");
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Player3 implements Runnable {
private CyclicBarrier barrier;
public Player3(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
System.out.println("Player 3 taking 2 seconds to join the game");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Player 3 Joined the game : Waiting for others");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Player 3 plays");
}
}
-------------------------------------------------------------------------
Console Output :
Player 1 taking 5 seconds to join the game
Player 2 8 seconds to join the game
Player 3 taking 2 seconds to join the game
main Gaming Console is waiting for Players to join
Player 3 Joined the game : Waiting for others
Player 1 Joined the game : Waiting for others
Player 2 Joined the game : Waiting for others
Player 2 plays
main All players have joined : Let's start
Player 3 plays
Player 1 plays
The CB uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).
-> Exchanger
A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return.
Whenever a thread arrives at the exchange point, it must wait for the other thread to arrive. When the other pairing thread arrives the two threads proceed to exchange their objects.
Exercise :
The following example shows a Money exchange system with bills and coins . Two candidates want to exchange their money and use this system to exchange their money . The candidate 1 arrives earlier than candidate 2 to exchange money , however waits for Candidate2 to arrive and exchange.
package concurrent;
import java.util.concurrent.Exchanger;
public class ExchangerImpl {
public static void main(String[] args) {
final Exchanger<Money> exchanger = new Exchanger<Money>();
Money candidate1 = new Money(100, 30);
Money candidate2 = new Money(75, 25);
new Thread(new Candidate1(candidate1, exchanger), "Candidate 1")
.start();
new Thread(new Candidate2(candidate2, exchanger), "Candidate 2")
.start();
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.Exchanger;
public class Candidate1 implements Runnable {
private Exchanger<Money> exchanger;
private Money money;
public Candidate1(Money money, Exchanger<Money> exchanger) {
this.money = money;
this.exchanger = exchanger;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : Before :"
+ money.getBill() + "." + money.getCoin());
try {
System.out.println("Candidate 1 is ready to exchange money : waiting");
money = exchanger.exchange(money);
System.out.println("Candidate 1 has exchanged money");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " : After :"
+ money.getBill() + "." + money.getCoin());
}
}
-------------------------------------------------------------------------
package concurrent;
import java.util.concurrent.Exchanger;
public class Candidate2 implements Runnable {
private Exchanger<Money> exchanger;
private Money money;
public Candidate2(Money money, Exchanger<Money> exchanger) {
this.money = money;
this.exchanger = exchanger;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : Before :"
+ money.getBill() + "." + money.getCoin());
try {
System.out.println("Candidate 2 needs 5 seconds to arrive");
Thread.sleep(5000);
money = exchanger.exchange(money);
System.out.println("Candidate 2 has exchanged money");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " : After :"
+ money.getBill() + "." + money.getCoin());
}
}
-------------------------------------------------------------------------
Console output :
Candidate 1 : Before :100.30
Candidate 1 is ready to exchange money : waiting
Candidate 2 : Before :75.25
Candidate 2 needs 5 seconds to arrive
-------------------5 seconds-------------
Candidate 2 has exchanged money
Candidate 2 : After :100.30
Candidate 1 has exchanged money
Candidate 1 : After :75.25
No comments:
Post a Comment