블루투스, 카메라, 마이크 등 단일 하드웨어에 의존적인 API사용에 대해서는 일반적으로 한번에 한가지 작업 밖에 할 수 없다. 예를 들어 동영상을 인코딩 하는 인코더 모듈이 존재하고, 이 인코더는 한번에 하나의 파일만 인코딩 할 수 있는 상황이라면 여러 동영상 인코딩 작업 목록을 순차적으로 하나씩 처리해야 할 것이다.

[그림] 여러 요청들을 한번에 하나씩 처리한 뒤 결과 데이터를 발행하는 인코더

앞의 그림을 코드로 간단히 표현하자면 다음과 같다.

public class Encoder {
    private static final Encoder ourInstance = new Encoder();
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public static Encoder getInstance() {
        return ourInstance;
    }

    private Encoder() {
    }

    public Single<String> addVideo(String sourcePath) {
        return Single.just(sourcePath)
                .subscribeOn(Schedulers.from(executorService))
                .map(this::encode);
    }

    private String encode(String path) throws InterruptedException {
        //인코딩 작업 시작
        Thread.sleep(1000);
        //인코딩 작업 끝
        String outputPath = path + " done";
        return outputPath;
    }

}

Encoder는 여러 클라이언트로부터 접근 가능한 공용자원이기 때문에 싱글톤으로 표현하고, 순차적으로 인코딩을 처리하기 위해 단일 쓰레드를 사용한다. 쓰레드의 생성은 Executors.newSingleThreadExecutor() 메서드를 호출하여 ExecutorService 인스턴스를 얻고, 이를 RxJava의 Scheduler로 지정하여 작업들을 스케쥴링 하는 것이 이번 아티클의 핵심내용이다. 

순차적으로 작업을 잘 진행하는지 확인하기 위해서 간단한 유닛테스트를 작성할 수 있다.

    @Test
    public void 순차작업_테스트() throws InterruptedException {
        int count = 5;
        CountDownLatch countDownLatch = new CountDownLatch(count);

        Encoder transcoderAdapter = Encoder.getInstance();
        for (int i = 0; i < count; i++) {
            transcoderAdapter.addVideo("Video " + i).subscribe(result -> {
                System.out.println(result);
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
    }
결과:
Video 0 done
Video 1 done
Video 2 done
Video 3 done
Video 4 done

유닛테스트의 실행결과를 확인하면 시스템 메시지가 동시다발적으로 출력되는 것이 아니라,  1초에 한번씩 출력된다. 즉, 작업들이 병행처리 되지 않고 순차적으로 처리되는 것을 보장한다는 것을 확인할 수 있다.

카테고리: RxJava

0개의 댓글

답글 남기기

Avatar placeholder

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.