#java, #thread, #thread-pool

🧢 Java μ—μ„œ μŠ€λ ˆλ“œ ν’€(Thread Pool) 을 μ‚¬μš©ν•΄ 보자

(자료 좜처 : https://www.baeldung.com/thread-pool-java-and-guava)

ν”„λ‘œμ„ΈμŠ€ λ‚΄μ—μ„œ μŠ€λ ˆλ“œμ˜ 생성 및 μˆ˜κ±°κ°€ λΉˆλ²ˆν•˜κ²Œ λ°œμƒν•œλ‹€λ©΄ λ©”λͺ¨λ¦¬ 할당에 μ†Œλͺ¨λ˜λŠ” λΉ„μš©μ΄ 많이 듀지 μ•Šμ„κΉŒ? 이에 λŒ€ν•œ ν•΄λ‹΅μœΌλ‘œ μŠ€λ ˆλ“œ 풀에 λŒ€ν•΄ μ•Œμ•„λ³΄κ³  Java μ—μ„œ μ–΄λ–»κ²Œ μ‚¬μš©ν•˜λ©΄ λ˜λŠ”μ§€ μ•Œμ•„λ³΄μž.

μŠ€λ ˆλ“œ ν’€(Thread Pool)

μŠ€λ ˆλ“œ μ œμ–΄ 문제λ₯Ό ν•΄κ²°ν•  λ°©λ²•μœΌλ‘œ μŠ€λ ˆλ“œ 풀을 μ‚¬μš©ν•œλ‹€. μŠ€λ ˆλ“œ 풀은 맀번 생성 및 수거 μš”μ²­μ΄ 올 λ•Œ μŠ€λ ˆλ“œλ₯Ό μƒμ„±ν•˜κ³  μˆ˜κ±°ν•˜λŠ” 것이 μ•„λ‹Œ, μŠ€λ ˆλ“œ μ‚¬μš©μžκ°€ μ„€μ •ν•΄λ‘” 개수만큼 미리 μƒμ„±ν•΄λ‘λŠ” 것이닀.

μŠ€λ ˆλ“œ(Thread)

  • μ–΄λ–€ ν”„λ‘œκ·Έλž¨ λ‚΄μ—μ„œ μ‹€ν–‰λ˜λŠ” νλ¦„μ˜ λ‹¨μœ„
  • 특히 ν”„λ‘œμ„ΈμŠ€ λ‚΄μ—μ„œ μ‹€ν–‰λ˜λŠ” νλ¦„μ˜ λ‹¨μœ„

ν’€(Pool)

  • ν•„μš”ν•  λ•Œλ§ˆλ‹€ 개체λ₯Ό ν• λ‹Ήν•˜κ³  νŒŒκ΄΄ν•˜λŠ” λŒ€μ‹ , μ‚¬μš© μ€€λΉ„λœ μƒνƒœλ‘œ μ΄ˆκΈ°ν™”λœ 개체 집합

μŠ€λ ˆλ“œλŠ” λ™μΌν•œ λ©”λͺ¨λ¦¬ μ˜μ—­μ—μ„œ 생성 및 관리가 μ΄λ£¨μ–΄μ§€μ§€λ§Œ, μƒμ„±ν•˜κ±°λ‚˜ μˆ˜κ±°ν•  λ•Œ 컀널 였브젝트λ₯Ό λ™λ°˜ν•˜λŠ” λ¦¬μ†ŒμŠ€μ΄λ―€λ‘œ 생성 λΉ„μš©μ΄ 크게 λ°œμƒν•œλ‹€. μŠ€λ ˆλ“œλ₯Ό μ œμ–΄ν•  수 μ—†λŠ” μƒνƒœμ—μ„œ μŠ€λ ˆλ“œλ₯Ό λ¬΄μ°¨λ³„μ μœΌλ‘œ μƒμ„±ν•˜λ©΄ λ¦¬μ†ŒμŠ€κ°€ λΉ λ₯΄κ²Œ μ†Œμ§„λ˜λŠ” 상황이 λ°œμƒν•  수 μžˆλ‹€. 그러면 μ–΄λ–»κ²Œ ν•˜λ©΄ 효율적으둜 μŠ€λ ˆλ“œλ₯Ό μ œμ–΄ν•  수 μžˆμ„κΉŒ? μŠ€λ ˆλ“œ ν’€μ˜ λ™μž‘ 방식은 κ°„λ‹¨ν•˜κ²Œ 보면 λ‹€μŒκ³Ό κ°™λ‹€.

  1. 병렬 μž‘μ—…μ˜ ν˜•νƒœλ‘œ λ™μ‹œ μ½”λ“œλ₯Ό μž‘μ„±ν•œλ‹€.
  2. 싀행을 μœ„ν•΄ μŠ€λ ˆλ“œ ν’€μ˜ μΈμŠ€ν„΄μŠ€μ— μ œμΆœν•œλ‹€.
  3. μ œμΆœν•œ μΈμŠ€ν„΄μŠ€μ—μ„œ μ‹€ν–‰ν•˜κΈ° μœ„ν•΄ μž¬μ‚¬μš©λ˜λŠ” μ—¬λŸ¬ μŠ€λ ˆλ“œλ₯Ό μ œμ–΄ν•œλ‹€.

μŠ€λ ˆλ“œ 풀을 μ‚¬μš©ν•˜λ©΄ λΉ„μš©μ μΈ μΈ‘λ©΄μ΄λ‚˜ μ»¨ν…μŠ€νŠΈ μŠ€μœ„μΉ˜κ°€ λ°œμƒν•˜λŠ” μƒν™©μ—μ„œ λ”œλ ˆμ΄λ₯Ό 쀄일 수 μžˆλ‹€λŠ” μž₯점이 μžˆλ‹€. κ·Έλ ‡λ‹€λ©΄ 단점은 μ–΄λ–€ 것이 μžˆμ„κΉŒ?

단점은 μŠ€λ ˆλ“œ 풀에 λ„ˆλ¬΄ λ§Žμ€ μ–‘μ˜ μŠ€λ ˆλ“œλ₯Ό λ§Œλ“€μ–΄λ‘”λ‹€λ©΄ λ©”λͺ¨λ¦¬ λ‚­λΉ„κ°€ μ‹¬ν•΄μ§ˆ 수 μžˆλ‹€λŠ” 점이닀. κ·Έ λ•Œλ¬Έμ— μ–Όλ§ŒνΌμ˜ μŠ€λ ˆλ“œκ°€ ν•„μš”ν• μ§€ μ˜ˆμΈ‘ν•˜κ³  ν• λ‹Ήν•΄μ„œ μ‚¬μš©ν•˜λŠ” 것이 μŠ€λ ˆλ“œ 풀을 ν˜„λͺ…ν•˜κ²Œ μ‚¬μš©ν•˜λŠ” 것이라고 ν•  수 μžˆλ‹€.

Java μ—μ„œ μŠ€λ ˆλ“œ ν’€ μ‚¬μš©ν•˜κΈ°

ExecutorService μΈν„°νŽ˜μ΄μŠ€μ˜ κ΅¬ν˜„ 객체λ₯Ό 정적 νŒ©ν† λ¦¬ λ©”μ„œλ“œλ‘œ μ œκ³΅ν•˜λŠ” Executors 클래슀의 μ„Έ 가지 λ©”μ†Œλ“œ 쀑 ν•˜λ‚˜λ₯Ό μ΄μš©ν•˜μ—¬ μŠ€λ ˆλ“œ 풀을 μ‰½κ²Œ 생성할 수 μžˆλ‹€. μžμ„Έν•œ λ©”μ„œλ“œ μ‚¬μš©λ²•μ€ μ΄κ³³μ—μ„œ μ•Œ 수 μžˆλ‹€.

ThreadPoolExecutor μ—μ„œ μ‚¬μš©λ˜λŠ” νŒŒλΌλ―Έν„°λ‘œλŠ” corePoolSize(생성할 개수), maximumPoolSize(생성할 μ΅œλŒ€ 개수), keepAliveTime(μœ μ§€ μ‹œκ°„) 이 μžˆλ‹€. μŠ€λ ˆλ“œ ν’€μ—μ„œ μŠ€λ ˆλ“œλ₯Ό 생성할 λ•Œ corePoolSize 의 νŒŒλΌλ―Έν„°λ§ŒνΌ μ½”μ–΄ μŠ€λ ˆλ“œλ₯Ό μƒμ„±ν•œλ‹€. 그리고 μƒˆλ‘œμš΄ μž‘μ—…μ΄ λ“€μ–΄μ˜¬ λ•Œ λͺ¨λ“  μ½”μ–΄ μŠ€λ ˆλ“œκ°€ μ‚¬μš© 쀑이고 λ‚΄λΆ€ 큐가 가득 μ°¨λ©΄ μŠ€λ ˆλ“œ ν’€μ˜ μ΅œλŒ€ 크기가 maximumPoolSize 만큼 컀질 수 μžˆλ‹€. λ§Œμ•½ ν˜„μž¬ μŠ€λ ˆλ“œ 풀이 corePoolSize 보닀 λ§Žμ€ μŠ€λ ˆλ“œλ₯Ό 가지고 μžˆλ‹€λ©΄, μ΄ˆκ³Όν•œ μŠ€λ ˆλ“œμ— λŒ€ν•΄μ„œ keepAliveTime νŒŒλΌλ―Έν„°κ°’λ³΄λ‹€ μ˜€λž«λ™μ•ˆ ν•  일이 μ—†μœΌλ©΄ μ œκ±°λœλ‹€. 이것은 μžμ›μ˜ λ‚­λΉ„λ₯Ό κ°μ†Œ μ‹œμΌœ 효율적으둜 μŠ€λ ˆλ“œ 풀을 관리할 수 있게 λœλ‹€.

μŠ€λ ˆλ“œ 풀에 μž‘μ—…μš”μ²­μ„ ν•˜λŠ” 방식은 execute( ), submit( ) 방식이 μžˆλ‹€. execute 방식은 μž‘μ—… 처리 쀑에 μ˜ˆμ™Έκ°€ λ°œμƒν•˜λ©΄ ν•΄λ‹Ή μŠ€λ ˆλ“œκ°€ μ’…λ£Œλ˜κ³  μŠ€λ ˆλ“œ ν’€μ—μ„œ μ œκ±°ν•œ λ’€, μƒˆλ‘œμš΄ μŠ€λ ˆλ“œλ₯Ό μƒμ„±ν•˜μ—¬ λ‹€λ₯Έ μž‘μ—…μ„ μ²˜λ¦¬ν•œλ‹€. λ˜ν•œ 처리결과λ₯Ό λ°˜ν™˜ν•˜μ§€ μ•ŠλŠ”λ‹€. λ°˜λŒ€λ‘œ submit 은 μž‘μ—… 처리 쀑에 μ˜ˆμ™Έκ°€ λ°œμƒν•˜λ”λΌλ„ μŠ€λ ˆλ“œκ°€ μ’…λ£Œλ˜μ§€ μ•Šκ³  λ‹€μŒ μž‘μ—…μ— μ‚¬μš©λœλ‹€. λ˜ν•œ 처리 κ²°κ³Όλ₯Ό Future<?> 둜 λ°˜ν™˜ν•œλ‹€. λ”°λΌμ„œ μŠ€λ ˆλ“œ 풀을 μ‚¬μš©ν•  λ•Œ submit 을 μ‚¬μš©ν•˜λŠ” 점이 더 λ°”λžŒμ§ν•˜λ‹€.

ν…ŒμŠ€νŠΈ 클래슀λ₯Ό ν•˜λ‚˜ μƒμ„±ν•˜μ—¬ ν…ŒμŠ€νŠΈλ₯Ό μ§„ν–‰ν•œλ‹€.

μš°μ„  MyCounter 클래슀λ₯Ό ν•˜λ‚˜ λ§Œλ“ λ‹€. ν•΄λ‹Ή ν΄λž˜μŠ€λŠ” increment λ©”μ„œλ“œλ₯Ό 톡해 count λ₯Ό μ¦κ°€μ‹œν‚¨ ν›„ 1초의 λŒ€κΈ°μ‹œκ°„μ„ κ°–λŠ”λ‹€.

public class MyCounter {
    private int count;

    public void increment() {
        try {
            int temp = count;
            count = temp + 1;
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getCount() {
        return count;
    }
}

첫 번째 방법은 Executors.newFixedThreadPool(int nThreads) 에 λŒ€ν•΄ μ•Œμ•„λ³Έλ‹€.

νŒŒλΌλ―Έν„°λ‘œ μ œκ³΅λ˜λŠ” n 개 만큼 μŠ€λ ˆλ“œ 풀을 μƒμ„±ν•œλ‹€. 보톡 μΌμ •λŸ‰μ˜ 업무가 λ°œμƒν•  λ•Œ μ‚¬μš©ν•œλ‹€. λ‹€μŒ μ˜ˆμ‹œλŠ” 18개의 μŠ€λ ˆλ“œκ°€ ν•„μš”ν•œ Task λ₯Ό μ œκ³΅ν•˜κ³  5개의 μŠ€λ ˆλ“œ ν’€λ‘œ μ²˜λ¦¬ν•˜λŠ” 과정을 확인해 λ³΄λŠ” ν…ŒμŠ€νŠΈλ‹€.

    @DisplayName("FixedThreadPool 을 μƒμ„±ν•œλ‹€.")
    @Test
    void testCounterWithConcurrencyFixed() throws InterruptedException {
        int numberOfThreads = 18;
        ExecutorService service = Executors.newFixedThreadPool(5);
        CountDownLatch latch = new CountDownLatch(numberOfThreads);
        MyCounter counter = new MyCounter();
        iterateThread(numberOfThreads, service, latch, counter);

        assertThat(((ThreadPoolExecutor) service).getPoolSize()).isEqualTo(5);
    }

    private void iterateThread(int numberOfThreads, ExecutorService service, CountDownLatch latch, MyCounter counter) throws InterruptedException {
        for (int i = 0; i < numberOfThreads; i++) {
            service.submit(() -> {
                counter.increment();
                latch.countDown();
                throw new IllegalArgumentException();
            });
        }
        latch.await();
    }

image

μ‹€ν–‰ κ²°κ³Όλ₯Ό μ‚΄νŽ΄λ³΄λ©΄ ν•œ μŠ€λ ˆλ“œμ—μ„œ μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” 데 1μ΄ˆκ°€ 걸리도둝 섀정이 λ˜μ–΄μžˆλ‹€. κ·Έ λ•Œλ¬Έμ— 5개의 μž‘μ—…μ„ 3번, 3개의 μž‘μ—…μ„ 1번 ν•˜κ²Œ λ˜λ©΄μ„œ 총 μž‘μ—… μ‹œκ°„μ€ (3+1)초 + 둜직 μ‹€ν–‰ μ‹œκ°„(300ms)이 λœλ‹€. λ”°λΌμ„œ μŠ€λ ˆλ“œ 풀에 μƒμ„±λœ μŠ€λ ˆλ“œ 개수만큼 μž‘μ—…μ„ μ²˜λ¦¬ν•˜λŠ” λͺ¨μŠ΅μ„ 확인할 수 μžˆλ‹€. λ˜ν•œ, μŠ€λ ˆλ“œ ν’€μ˜ 크기가 5개둜 μœ μ§€λ˜λŠ” λͺ¨μŠ΅μ„ 확인할 수 μžˆλ‹€. ν•΄λ‹Ή μž‘μ—…μ—μ„œ Count κ°€ μ—¬λŸ¬ μŠ€λ ˆλ“œμ—μ„œ λ™μ‹œμ— ν˜ΈμΆœλ˜λŠ”λ° 이 경우 λΉ„λ™κΈ°λ‘œ 처리되기 λ•Œλ¬Έμ— μ‹€μ œ Count 의 κ²°κ³Ό(counter.getCount())λŠ” 100보닀 μž‘μ€ 값이 λ°˜ν™˜λ˜λ©° μ‹€ν–‰ν•  λ•Œλ§ˆλ‹€ λžœλ€ν•œ κ²°κ³Όλ₯Ό λ°˜ν™˜ν•˜λŠ” λͺ¨μŠ΅μ„ 확인할 수 μžˆλ‹€.

두 번째 방법은 Executors.newCachedThreadPool() 에 λŒ€ν•΄ μ•Œμ•„λ³Έλ‹€.

초기 μŠ€λ ˆλ“œ κ°œμˆ˜κ°€ 0개둜 μ„€μ •λ˜λ©° μŠ€λ ˆλ“œ κ°œμˆ˜λ³΄λ‹€ λ§Žμ€ μ–‘μ˜ μž‘μ—…μ˜ μš”μ²­λ˜λ©΄ μƒˆλ‘œμš΄ μŠ€λ ˆλ“œλ₯Ό μƒμ„±ν•˜μ—¬ μž‘μ—…μ„ μ²˜λ¦¬ν•œλ‹€. μž‘μ—…μ΄ λλ‚œ μŠ€λ ˆλ“œκ°€ 60초 이상 μƒˆλ‘œμš΄ μž‘μ—…μš”μ²­μ΄ μ—†μœΌλ©΄ μŠ€λ ˆλ“œλ₯Ό μ’…λ£Œν•˜κ³  μŠ€λ ˆλ“œ ν’€μ—μ„œ μ œκ±°λœλ‹€. λ‹€μŒ μ˜ˆμ‹œλŠ” 18개의 μŠ€λ ˆλ“œκ°€ ν•„μš”ν•œ Task λ₯Ό μ œκ³΅ν•˜κ³  5개의 μŠ€λ ˆλ“œ ν’€λ‘œ μ²˜λ¦¬ν•˜λŠ” 과정을 확인해 λ³΄λŠ” ν…ŒμŠ€νŠΈλ‹€. μœ„μ— 생성해둔 μ½”λ“œμ— μ΄μ–΄μ„œ μž‘μ„±ν•΄μ„œ ν…ŒμŠ€νŠΈλ₯Ό μ§„ν–‰ν•˜λ©΄ λœλ‹€.

    @DisplayName("CachedThreadPool 을 μƒμ„±ν•œλ‹€.")
    @Test
    void testCounterWithConcurrencyCached() throws InterruptedException {
        int numberOfThreads = 18;
        ExecutorService service = Executors.newCachedThreadPool();
        CountDownLatch latch = new CountDownLatch(numberOfThreads);
        MyCounter counter = new MyCounter();
        iterateThread(numberOfThreads, service, latch, counter);

        assertThat(counter.getCount()).isEqualTo(numberOfThreads);
        assertThat(((ThreadPoolExecutor) service).getPoolSize()).isEqualTo(18);
        Thread.sleep(60000); // 60초 ν›„ μƒμ„±λœ μŠ€λ ˆλ“œκ°€ μ œκ±°λ˜λŠ”μ§€ ν™•μΈν•œλ‹€.
        assertThat(((ThreadPoolExecutor) service).getPoolSize()).isEqualTo(0);
    }

image

μ„Έ 번째 방법은 Executors.newScheduledThreadPool(int corePoolSize) 에 λŒ€ν•΄ μ•Œμ•„λ³Έλ‹€.

μŠ€λ ˆλ“œλ₯Ό μΌμ •μ‹œκ°„μ΄ 흐λ₯΄κ³  λ‚œ λ’€ μ‹€ν–‰μ‹œν‚€λ„λ‘ ν•˜λŠ” μŠ€μΌ€μ€„λ§ μŠ€λ ˆλ“œ κΈ°λŠ₯이닀. ν•΄λ‹Ή κΈ°λŠ₯을 ν…ŒμŠ€νŠΈν•΄ 보기 μœ„ν•΄μ„œλŠ” ν…ŒμŠ€νŠΈ μ½”λ“œκ°€ μ•„λ‹Œ λ©”μΈμ—μ„œ 싀행해봐야 ν•œλ‹€. λ”°λΌμ„œ Sample 클래슀λ₯Ό λ§Œλ“€κ³  μ‹€μŠ΅μ„ ν•΄λ³Έλ‹€. corePoolSize λŠ” 생성할 corePool 의 크기λ₯Ό μ§€μ •ν•΄μ£ΌλŠ” 뢀뢄인데 Executors.newScheduledThreadPool(0) 을 ν•˜λ”λΌλ„ μ‹€ν–‰μ—λŠ” λ¬Έμ œκ°€ μ—†μ–΄ 보인닀. λ‹€λ§Œ JDK 8 버전 μ΄ν•˜μ—μ„œ 발견된 λ²„κ·Έλ‘œ 단일 μ½”μ–΄ 가상 λ¨Έμ‹ μ—μ„œ CPU λ₯Ό 100% μ‚¬μš©ν•˜λŠ” 버그가 있기 λ•Œλ¬Έμ— νŒŒλΌλ―Έν„°λ‘œ 1 μ΄μƒμœΌλ‘œ μ„€μ •ν•œλ‹€.

public class Sample {
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);

        System.out.println("첫 번째 μž‘μ—…μ΄ μ‹€ν–‰λ©λ‹ˆλ‹€.");
        service.schedule(() -> System.out.println("두 번째 μž‘μ—…μ΄ μ‹€ν–‰λ©λ‹ˆλ‹€."), 5, TimeUnit.SECONDS);
        System.out.println("μ„Έ 번째 μž‘μ—…μ΄ μ‹€ν–‰λ©λ‹ˆλ‹€.");

        service.shutdown();
    }
}

image μŠ€μΌ€μ€„μ—μ„œ μ„€μ •ν•œ 5μ΄ˆκ°€ 흐λ₯Έ ν›„ 두 번째 μž‘μ—…μ΄ μ‹€ν–‰λ©λ‹ˆλ‹€. κ°€ μ‹€ν–‰λ˜λŠ” λͺ¨μŠ΅μ„ 확인할 수 μžˆλ‹€.

κ²°λ‘ 

μžλ°”μ—μ„œ μ‚¬μš©ν•˜κ²Œ 될 μŠ€λ ˆλ“œ 풀에 λŒ€ν•΄ κ°„λ‹¨ν•˜κ²Œ μ•Œμ•„λ΄€λ‹€. μ–Όλ§ˆλ§ŒνΌμ˜ μŠ€λ ˆλ“œκ°€ 주기적으둜 μ†Œλͺ¨λ μ§€, μŠ€λ ˆλ“œλ₯Ό μΆ”κ°€λ‘œ 생성할지, μž‘μ—… ν›„ λ°˜ν™˜ 값을 받을지에 λŒ€ν•œ 뢄석을 ν† λŒ€λ‘œ 상황에 μ ν•©ν•œ Thread 및 ThreadPool 을 μ μš©ν•œλ‹€λ©΄ λ©€ν‹° μ½”μ–΄ ν”„λ‘œκ·Έλž˜λ°μ— 큰 도움이 λœλ‹€κ³  μƒκ°ν•œλ‹€. λ‹€λ§Œ, μ μ ˆν•˜μ§€ μ•Šκ²Œ μ‚¬μš©ν•  경우 였히렀 μ‚¬μš©ν•˜μ§€ μ•Šμ€ 것보닀 λͺ»ν•  수 있기 λ•Œλ¬Έμ— μ£Όμ˜ν•΄μ•Ό ν•œλ‹€.

μ°Έκ³