자바의 CyclicBarrier

1. 소개

CyclicBarriersjava.util.concurrent 패키지 의 일부로 Java 5에 도입 된 동기화 구조입니다 .

이 기사에서는 동시성 시나리오에서이 구현을 살펴볼 것입니다.

2. 자바 동시성 – 동기화 기

java.util.concurrent에서의 패키지는 도움이 서로 협력 스레드의 집합을 관리하는 여러 클래스가 포함되어 있습니다. 이들 중 일부는 다음과 같습니다.

  • CyclicBarrier
  • 페이저
  • CountDownLatch
  • 교환기
  • 신호기
  • 동기 대기열

이러한 클래스는 스레드 간의 일반적인 상호 작용 패턴을위한 기본 기능을 제공합니다.

서로 통신하고 공통 패턴 중 하나와 유사한 스레드 집합이있는 경우 잠금 및 조건 집합을 사용하여 사용자 지정 체계를 만드는 대신 적절한 라이브러리 클래스 ( 동기화 자라고 도 함 )를 재사용 할 수 있습니다. 개체동기화 된 키워드.

앞으로 의 CyclicBarrier 에 초점을 맞 춥니 다.

3. CyclicBarrier

으로 CyclicBarrier는 공통의 실행 지점에 도달하기 위해 서로를 위해 대기하는 스레드의 집합을 할 수있는 동기이며, 또한라는 장벽 .

CyclicBarriers 는 실행을 계속하기 전에 서로가 공통 지점에 도달 할 때까지 기다려야하는 고정 된 수의 스레드가있는 프로그램에서 사용됩니다.

장벽은 대기중인 스레드가 해제 된 후 재사용 될 수 있기 때문에 순환 이라고합니다.

4. 사용법

CyclicBarrier 의 생성자 는 간단합니다. 공통 실행 지점에 도달 함을 나타 내기 위해 장벽 인스턴스 에서 await () 메서드 를 호출해야하는 스레드 수를 나타내는 단일 정수를 사용 합니다.

public CyclicBarrier(int parties)

실행을 동기화해야하는 스레드를 당사자 라고도하며 await () 메서드를 호출하면 특정 스레드가 장벽 지점에 도달했음을 등록 할 수 있습니다.

이 호출은 동기식이며이 메서드를 호출하는 스레드는 지정된 수의 스레드가 장벽에서 동일한 메서드를 호출 할 때까지 실행을 일시 중단합니다. 필요한 수의 스레드가 await ()를 호출 한 이러한 상황을 장벽 트립 이라고 합니다 .

선택적으로 Runnable 인스턴스 인 생성자에 두 번째 인수를 전달할 수 있습니다 . 이것은 장벽을 넘어가는 마지막 스레드에 의해 실행되는 로직을 가지고 있습니다.

public CyclicBarrier(int parties, Runnable barrierAction)

5. 구현

작동중인 CyclicBarrier 를보기 위해 다음 시나리오를 고려해 보겠습니다.

고정 된 수의 스레드가 수행하고 해당 결과를 목록에 저장하는 작업이 있습니다. 모든 스레드가 작업 수행을 마치면 그중 하나 (일반적으로 장벽을 넘어가는 마지막 스레드)가 각각에 의해 가져온 데이터 처리를 시작합니다.

모든 액션이 일어나는 메인 클래스를 구현해 봅시다 :

public class CyclicBarrierDemo { private CyclicBarrier cyclicBarrier; private List
    
      partialResults = Collections.synchronizedList(new ArrayList()); private Random random = new Random(); private int NUM_PARTIAL_RESULTS; private int NUM_WORKERS; // ... }
    

이 클래스는 매우 간단합니다. NUM_WORKERS 는 실행할 스레드 수이고 NUM_PARTIAL_RESULTS 는 각 작업자 스레드가 생성 할 결과 수입니다.

마지막으로, 이러한 작업자 스레드 각각의 결과를 저장할 목록 인 partialResults 가 있습니다. 이 목록은 SynchronizedList입니다. 왜냐하면 여러 스레드가 동시에 쓰게되고 add () 메서드는 일반 ArrayList 에서 스레드로부터 안전하지 않기 때문 입니다.

이제 각 작업자 스레드의 논리를 구현해 보겠습니다.

public class CyclicBarrierDemo { // ... class NumberCruncherThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); List partialResult = new ArrayList(); // Crunch some numbers and store the partial result for (int i = 0; i < NUM_PARTIAL_RESULTS; i++) { Integer num = random.nextInt(10); System.out.println(thisThreadName + ": Crunching some numbers! Final result - " + num); partialResult.add(num); } partialResults.add(partialResult); try { System.out.println(thisThreadName + " waiting for others to reach barrier."); cyclicBarrier.await(); } catch (InterruptedException e) { // ... } catch (BrokenBarrierException e) { // ... } } } }

이제 장벽이 넘어 졌을 때 실행되는 로직을 구현할 것입니다.

간단하게하기 위해 부분 결과 목록에 모든 숫자를 추가해 보겠습니다.

public class CyclicBarrierDemo { // ... class AggregatorThread implements Runnable { @Override public void run() { String thisThreadName = Thread.currentThread().getName(); System.out.println( thisThreadName + ": Computing sum of " + NUM_WORKERS + " workers, having " + NUM_PARTIAL_RESULTS + " results each."); int sum = 0; for (List threadResult : partialResults) { System.out.print("Adding "); for (Integer partialResult : threadResult) { System.out.print(partialResult+" "); sum += partialResult; } System.out.println(); } System.out.println(thisThreadName + ": Final result = " + sum); } } }

마지막 단계는 CyclicBarrier 를 구성하고 main () 메서드로 작업을 시작하는 것입니다 .

public class CyclicBarrierDemo { // Previous code public void runSimulation(int numWorkers, int numberOfPartialResults) { NUM_PARTIAL_RESULTS = numberOfPartialResults; NUM_WORKERS = numWorkers; cyclicBarrier = new CyclicBarrier(NUM_WORKERS, new AggregatorThread()); System.out.println("Spawning " + NUM_WORKERS + " worker threads to compute " + NUM_PARTIAL_RESULTS + " partial results each"); for (int i = 0; i < NUM_WORKERS; i++) { Thread worker = new Thread(new NumberCruncherThread()); worker.setName("Thread " + i); worker.start(); } } public static void main(String[] args) { CyclicBarrierDemo demo = new CyclicBarrierDemo(); demo.runSimulation(5, 3); } } 

위의 코드에서 우리는 계산의 일부로 각각 3 개의 정수를 생성하고 결과 목록에 저장하는 5 개의 스레드로 순환 장벽을 초기화했습니다.

장벽이 트립되면 장벽을 트립 한 마지막 스레드가 AggregatorThread에 지정된 논리를 실행합니다. 즉, 스레드가 생성 한 모든 숫자를 추가합니다.

6. 결과

다음은 위의 프로그램을 한 번 실행 한 결과입니다. 스레드가 다른 순서로 생성 될 수 있으므로 각 실행은 다른 결과를 생성 할 수 있습니다.

Spawning 5 worker threads to compute 3 partial results each Thread 0: Crunching some numbers! Final result - 6 Thread 0: Crunching some numbers! Final result - 2 Thread 0: Crunching some numbers! Final result - 2 Thread 0 waiting for others to reach barrier. Thread 1: Crunching some numbers! Final result - 2 Thread 1: Crunching some numbers! Final result - 0 Thread 1: Crunching some numbers! Final result - 5 Thread 1 waiting for others to reach barrier. Thread 3: Crunching some numbers! Final result - 6 Thread 3: Crunching some numbers! Final result - 4 Thread 3: Crunching some numbers! Final result - 0 Thread 3 waiting for others to reach barrier. Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 1 Thread 2: Crunching some numbers! Final result - 0 Thread 2 waiting for others to reach barrier. Thread 4: Crunching some numbers! Final result - 9 Thread 4: Crunching some numbers! Final result - 3 Thread 4: Crunching some numbers! Final result - 5 Thread 4 waiting for others to reach barrier. Thread 4: Computing final sum of 5 workers, having 3 results each. Adding 6 2 2 Adding 2 0 5 Adding 6 4 0 Adding 1 1 0 Adding 9 3 5 Thread 4: Final result = 46 

위의 출력에서 ​​알 수 있듯이 Thread 4 는 장벽을 넘어 가고 최종 집계 논리도 실행하는 스레드 입니다. 또한 위의 예에 표시된 것처럼 스레드가 시작된 순서대로 실제로 실행될 필요는 없습니다.

7. 결론

이 기사에서는 CyclicBarrier 가 무엇인지, 어떤 상황에서 도움이 되는지 살펴 보았습니다 .

또한 다른 프로그램 논리를 계속하기 전에 고정 된 실행 지점에 도달하기 위해 고정 된 수의 스레드가 필요한 시나리오를 구현했습니다.

항상 그렇듯이 튜토리얼 코드는 GitHub에서 찾을 수 있습니다.