Reactive Programing(1) – 리액티브 프로그래밍 개념잡기
Reactive Programing(2) – Reactive Operator
Reactive Programing(3) – Reactive Operator
Reactive Programing(4) – Scheduler
Reactive Programing(5) – 안드로이드에서의 RxJava 활용


Reactive Operators(리액티브 연산자)

스케쥴러 설명
newThread() 새로운 스레드 생성
single() 단일 스레드 생성 후 사용
computation() 계산용 스레드
io() 네트워크, 파일 입출력 스레드
trampoline() 현제 스레드에 대기행렬 생성


지난 포스팅에서 생성연산자와, 변환연산자에 대해서 알아보았습니다.
이번 포스팅에서는 결합연산자, 조건연산자에 대해서 알아보겠습니다.
 

결합연산자

1. zip() 함수


2개 이상의 Observable을 결합하여 데이터를 발행합니다.

val shapes = listOf("Circle", "Pentagon", "Star")
val colors = listOf("yellow", "Pink", "Cyan")
Observable.zip(
      shapes.toObservable(),
      colors.toObservable(),
      BiFunction<String, String, String> { t1, t2 ->
         "$t1 $t2"
      }
).subscribe {
   println(it)
}

결과:
Circle yellow
Pentagon Pink
Star Cyan
도형 데이터와 색깔 데이터 두 셋트를 준비합니다.
zip()으로 Observable을 하나 생성하고 인자로 도형, 색깔 데이터를 Observable형태로 넣습니다.
두 Observable에서 각각 데이터가 발행될때 처리할 수 있는 BiFunction 콜백 인터페이스를 구현합니다.
다음과 같이 코드를 조금 변형해보았습니다.

val shapes = listOf("Circle", "Pentagon", "Star", "Hello")
val colors = listOf("yellow", "Pink", "Cyan")
Observable.zip(
      shapes.toObservable(),
      Observable.interval(0, 500L, TimeUnit.MILLISECONDS).take(colors.size.toLong()).map {
         colors[it.toInt()]
      },
      BiFunction<String, String, String> { t1, t2 ->
         "$t1 $t2"
      }
).subscribe {
   println(it)
}
Thread.sleep(2000)

도형 리스트에 ‘Hello’라는 데이터가 추가되었습니다. 아까는 도형 3개 색깔 3개로 비율이 1:1로 맞았으나 어느 한 Observable의 데이터가 더 많은 경우는 어떻게 될까요?
그리고 한가지 더,
데이터가 발행되는 시기를 다르게 하기 위해 interval로 색깔을 500ms 시간차를 두고 발행했습니다.
 
결과:
Circle yellow
Pentagon Pink
Star Cyan
 
실행해본다면 100ms 마다 이벤트를 처리하는것을 확인하실 수 있습니다. 그리고 어느 한 Observable이 데이터를 발행하지 않는경우 이벤트가 처리되지 않는것도 확인할 수 있습니다.

*zipWith()함수


zip함수와 동일 하지만 생성 연산자가 아니라 Observable객체가 가지고 있는 연산자 입니다. Observable을 다양한 함수와 조합하면서 중간중간 호출할 수 있는 장점이 있습니다

Observable.zip(
      Observable.just(100, 200, 300),
      Observable.just(10, 20, 30),
      BiFunction<Int, Int, Int> { t1, t2 ->
         t1 + t2
      }
).zipWith(Observable.just(1, 2, 3), BiFunction { t1: Int, t2: Int -> t1 + t2 })
      .subscribe {
         println(it)
      }

결과:
111
222
333

2. combineLatest() 함수

zip과 비슷하지만 다른점은 두개이상의 Observable객체를 합성할 때 서로다른 Observable끼리 데이터가 발행하는것을 기다렸다가 합성하는게 아니라 가장 최근에 발행된 데이터끼리 합성합니다.
즉, Observable은 아래의 다이어그램의 타임라인처럼 독립적으로 데이터를 발행하고, 발행했을때의 다른 Observable의 가장 최근값을 가져와 합성합니다.

val colors = listOf("Pink", "Orange", "Cyan", "Yellow")
val shapes = listOf("Diamond", "Star", "Pentagon")
val source = Observable.combineLatest(
      colors.toObservable().zipWith(Observable.interval(0L, 100L, TimeUnit.MILLISECONDS), BiFunction { color: String, _: Long -> color }),
      shapes.toObservable().zipWith(Observable.interval(50L,200L, TimeUnit.MILLISECONDS),BiFunction { shape: String, _: Long -> shape }),
      BiFunction { color: String, shape: String -> "$color $shape" }
)
source.subscribe {
   println(it)
}
Thread.sleep(1000)

결과:
Pink Diamond
Orange Diamond
Cyan Diamond
Cyan Star
Yellow Star
Yellow Pentagon
 

3. merge() 함수

두개이상의 Observable에서 데이터 발행 순서여부에 상관없이 업스트림에서 먼저 입력되는 데이터를 그대로 받아 발행합니다.

 

val colorSet1 = listOf("Red", "Green")
val colorSet2 = listOf("Yellow", "Cyan", "Pink")
val colorSrc1 = Observable.zip(
      colorSet1.toObservable(),
      Observable.interval(0L, 200L, TimeUnit.MILLISECONDS).take(colorSet1.size.toLong()),
      BiFunction { color: String, _: Long ->
         color
      }
)
val colorSrc2 = Observable.zip(
      colorSet2.toObservable(),
      Observable.interval(100L, 200L, TimeUnit.MILLISECONDS).take(colorSet2.size.toLong()),
      BiFunction { color: String, _: Long ->
         color
      }
)
Observable.merge(
      colorSrc1,
      colorSrc2
).subscribe {
   println(it)
}
Thread.sleep(2000)

결과:
Red
Yellow
Green
Cyan
Pink
 

4. concat() 함수

2개 이상의 Observable을 이어 붙이는 함수.

Note:첫번째 Observable의 onComplete 이벤트가 발생하지 않으면 메모리 누수가 발생합니다.
 

val colors1 = listOf("Red", "Green", "Blue")
val colors2 = listOf("Yellow", "Blue", "Pink")
Observable.concat(colors1.toObservable(), colors2.toObservable())
      .subscribe {
         println(it)
      }

 

조건 연산자

조건 연산자는 Observable의 흐름을 제어합니다.

1. amb()  함수

가장 먼저 데이터를 발행하는 Observable을 택합니다. 나머지는 무시됩니다.

val colors1 = arrayOf("Red", "Green", "Blue")
val colors2 = arrayOf("Yellow", "Cyan")
Observable.amb(
      listOf(colors2.toObservable().delay(100L, TimeUnit.MILLISECONDS),
            colors1.toObservable())
).subscribe {
   println(it)
}

결과:
Red
Green
Blue
colors2 배열만 100ms 지연시켜서 발행한 결과,
colors1 이 채택되어 colors2는 무시되는것을 확인할 수 있습니다.
 

2. takeUntil() 함수

take()함수에 조건을 설정할 수 있습니다.

val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink")
colors.toObservable()
      .zipWith(
            Observable.interval(100L, TimeUnit.MILLISECONDS),
            BiFunction { t1: String, t2: Long ->
               t1
            }
      )
      .takeUntil(Observable.timer(550L, TimeUnit.MILLISECONDS))
      .subscribe {
         println(it)
      }
Thread.sleep(1000)

결과:
Red
Yellow
Green
Cyan
Blue

3. skipUntil() 함수

takeUntil과 반대로 Observable에서 데이터를 받을때까지 값을 건너 뜁니다.

val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink")
colors.toObservable()
      .zipWith(
            Observable.interval(100L, TimeUnit.MILLISECONDS),
            BiFunction { t1: String, t2: Long ->
               t1
            }
      )
      .skipUntil(Observable.timer(550L, TimeUnit.MILLISECONDS))
      .subscribe {
         println(it)
      }
Thread.sleep(1000)

결과:
Pink
4. all() 함수
모든 조건이 true일때만 데이터를 true를 발행합니다.

val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink")
colors.toObservable().all { t: String ->
   t == "Red"
}.subscribe { t: Boolean? ->
   println(t)
}

결과:
false
 
 

기타 연산자

1. delay() 함수

Observable의 데이터 발행을 지연시켜주는 역할을 합니다.

 

val startTime = System.currentTimeMillis()
val colors = arrayOf("Red", "Orange", "Yellow")
colors.toObservable()
      .delay(100L, TimeUnit.MILLISECONDS)//이 라인을 주석 후 실행하면 대략 100ms 만큼 빨라짐
      .subscribe {
         println("$it - ${System.currentTimeMillis() - startTime}")
      }
Thread.sleep(1000)

결과:
Red – 168
Orange – 168
Yellow – 168
 

2. timeInterval() 함수

어떤 값을 발행했을 때 이전 값을 발행한 후 얼마나 시간이 흘렀는지 알 수 있습니다.

val colors = arrayOf("Red", "Green", "Orange")
colors.toObservable()
      .delay {
         Thread.sleep(Random().nextInt(1000).toLong())
         Observable.just(it)
      }
      .timeInterval()
      .subscribe {
         println("$it")
      }

결과:
Timed[time=671, unit=MILLISECONDS, value=Red]
Timed[time=275, unit=MILLISECONDS, value=Green]
Timed[time=238, unit=MILLISECONDS, value=Orange]
 
1초내로Random하게 스레드를 sleep()하므로 실행할 때 마다 1초미만으로 time 값이 다르게 나오는 것을 확인 할 수 있습니다.

카테고리: RxJava

0개의 댓글

답글 남기기

Avatar placeholder

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.