Reactive Programing(1) – 리액티브 프로그래밍 개념잡기
Reactive Programing(2) – Reactive Operator
Reactive Programing(3) – Reactive Operator
Reactive Programing(4) – Scheduler
Reactive Programing(5) – 안드로이드에서의 RxJava 활용
Reactive Operators(리액티브 연산자)
| 스케쥴러 | 설명 | 
|---|---|
| newThread() | 새로운 스레드 생성 | 
| single() | 단일 스레드 생성 후 사용 | 
| computation() | 계산용 스레드 | 
| io() | 네트워크, 파일 입출력 스레드 | 
| trampoline() | 현제 스레드에 대기행렬 생성 | 
지난 포스팅에서 생성연산자와, 변환연산자에 대해서 알아보았습니다.
이번 포스팅에서는 결합연산자, 조건연산자에 대해서 알아보겠습니다.
 
결합연산자
1. zip() 함수

2개 이상의 Observable을 결합하여 데이터를 발행합니다.
val shapes = listOf("Circle", "Pentagon", "Star")
val colors = listOf("yellow", "Pink", "Cyan")
Observable.zip(
      shapes.toObservable(),
      colors.toObservable(),
      BiFunction<String, String, String> { t1, t2 ->
         "$t1 $t2"
      }
).subscribe {
   println(it)
}
결과:
Circle yellow
Pentagon Pink
Star Cyan
도형 데이터와 색깔 데이터 두 셋트를 준비합니다.
zip()으로 Observable을 하나 생성하고 인자로 도형, 색깔 데이터를 Observable형태로 넣습니다.
두 Observable에서 각각 데이터가 발행될때 처리할 수 있는 BiFunction 콜백 인터페이스를 구현합니다.
다음과 같이 코드를 조금 변형해보았습니다.
val shapes = listOf("Circle", "Pentagon", "Star", "Hello")
val colors = listOf("yellow", "Pink", "Cyan")
Observable.zip(
      shapes.toObservable(),
      Observable.interval(0, 500L, TimeUnit.MILLISECONDS).take(colors.size.toLong()).map {
         colors[it.toInt()]
      },
      BiFunction<String, String, String> { t1, t2 ->
         "$t1 $t2"
      }
).subscribe {
   println(it)
}
Thread.sleep(2000)
도형 리스트에 ‘Hello’라는 데이터가 추가되었습니다. 아까는 도형 3개 색깔 3개로 비율이 1:1로 맞았으나 어느 한 Observable의 데이터가 더 많은 경우는 어떻게 될까요?
그리고 한가지 더,
데이터가 발행되는 시기를 다르게 하기 위해 interval로 색깔을 500ms 시간차를 두고 발행했습니다.
 
결과:
Circle yellow
Pentagon Pink
Star Cyan
 
실행해본다면 100ms 마다 이벤트를 처리하는것을 확인하실 수 있습니다. 그리고 어느 한 Observable이 데이터를 발행하지 않는경우 이벤트가 처리되지 않는것도 확인할 수 있습니다.
*zipWith()함수

zip함수와 동일 하지만 생성 연산자가 아니라 Observable객체가 가지고 있는 연산자 입니다. Observable을 다양한 함수와 조합하면서 중간중간 호출할 수 있는 장점이 있습니다
Observable.zip(
      Observable.just(100, 200, 300),
      Observable.just(10, 20, 30),
      BiFunction<Int, Int, Int> { t1, t2 ->
         t1 + t2
      }
).zipWith(Observable.just(1, 2, 3), BiFunction { t1: Int, t2: Int -> t1 + t2 })
      .subscribe {
         println(it)
      }
결과:
111
222
333
2. combineLatest() 함수
zip과 비슷하지만 다른점은 두개이상의 Observable객체를 합성할 때 서로다른 Observable끼리 데이터가 발행하는것을 기다렸다가 합성하는게 아니라 가장 최근에 발행된 데이터끼리 합성합니다.
즉, Observable은 아래의 다이어그램의 타임라인처럼 독립적으로 데이터를 발행하고, 발행했을때의 다른 Observable의 가장 최근값을 가져와 합성합니다.

val colors = listOf("Pink", "Orange", "Cyan", "Yellow")
val shapes = listOf("Diamond", "Star", "Pentagon")
val source = Observable.combineLatest(
      colors.toObservable().zipWith(Observable.interval(0L, 100L, TimeUnit.MILLISECONDS), BiFunction { color: String, _: Long -> color }),
      shapes.toObservable().zipWith(Observable.interval(50L,200L, TimeUnit.MILLISECONDS),BiFunction { shape: String, _: Long -> shape }),
      BiFunction { color: String, shape: String -> "$color $shape" }
)
source.subscribe {
   println(it)
}
Thread.sleep(1000)
결과:
Pink Diamond
Orange Diamond
Cyan Diamond
Cyan Star
Yellow Star
Yellow Pentagon
 
3. merge() 함수
두개이상의 Observable에서 데이터 발행 순서여부에 상관없이 업스트림에서 먼저 입력되는 데이터를 그대로 받아 발행합니다.

 
val colorSet1 = listOf("Red", "Green")
val colorSet2 = listOf("Yellow", "Cyan", "Pink")
val colorSrc1 = Observable.zip(
      colorSet1.toObservable(),
      Observable.interval(0L, 200L, TimeUnit.MILLISECONDS).take(colorSet1.size.toLong()),
      BiFunction { color: String, _: Long ->
         color
      }
)
val colorSrc2 = Observable.zip(
      colorSet2.toObservable(),
      Observable.interval(100L, 200L, TimeUnit.MILLISECONDS).take(colorSet2.size.toLong()),
      BiFunction { color: String, _: Long ->
         color
      }
)
Observable.merge(
      colorSrc1,
      colorSrc2
).subscribe {
   println(it)
}
Thread.sleep(2000)
결과:
Red
Yellow
Green
Cyan
Pink
 
4. concat() 함수
2개 이상의 Observable을 이어 붙이는 함수.

Note:첫번째 Observable의 onComplete 이벤트가 발생하지 않으면 메모리 누수가 발생합니다.
 
val colors1 = listOf("Red", "Green", "Blue")
val colors2 = listOf("Yellow", "Blue", "Pink")
Observable.concat(colors1.toObservable(), colors2.toObservable())
      .subscribe {
         println(it)
      }
조건 연산자
조건 연산자는 Observable의 흐름을 제어합니다.
1. amb() 함수
가장 먼저 데이터를 발행하는 Observable을 택합니다. 나머지는 무시됩니다.

val colors1 = arrayOf("Red", "Green", "Blue")
val colors2 = arrayOf("Yellow", "Cyan")
Observable.amb(
      listOf(colors2.toObservable().delay(100L, TimeUnit.MILLISECONDS),
            colors1.toObservable())
).subscribe {
   println(it)
}
결과:
Red
Green
Blue
colors2 배열만 100ms 지연시켜서 발행한 결과,
colors1 이 채택되어 colors2는 무시되는것을 확인할 수 있습니다.
 
2. takeUntil() 함수
val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink")
colors.toObservable()
      .zipWith(
            Observable.interval(100L, TimeUnit.MILLISECONDS),
            BiFunction { t1: String, t2: Long ->
               t1
            }
      )
      .takeUntil(Observable.timer(550L, TimeUnit.MILLISECONDS))
      .subscribe {
         println(it)
      }
Thread.sleep(1000)
결과:
Red
Yellow
Green
Cyan
Blue
3. skipUntil() 함수
takeUntil과 반대로 Observable에서 데이터를 받을때까지 값을 건너 뜁니다.

val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink")
colors.toObservable()
      .zipWith(
            Observable.interval(100L, TimeUnit.MILLISECONDS),
            BiFunction { t1: String, t2: Long ->
               t1
            }
      )
      .skipUntil(Observable.timer(550L, TimeUnit.MILLISECONDS))
      .subscribe {
         println(it)
      }
Thread.sleep(1000)
결과:
Pink
4. all() 함수
모든 조건이 true일때만 데이터를 true를 발행합니다.

val colors = arrayOf("Red", "Yellow", "Green", "Cyan", "Blue", "Pink")
colors.toObservable().all { t: String ->
   t == "Red"
}.subscribe { t: Boolean? ->
   println(t)
}
결과:
false
 
 
기타 연산자
1. delay() 함수
Observable의 데이터 발행을 지연시켜주는 역할을 합니다.

 
val startTime = System.currentTimeMillis()
val colors = arrayOf("Red", "Orange", "Yellow")
colors.toObservable()
      .delay(100L, TimeUnit.MILLISECONDS)//이 라인을 주석 후 실행하면 대략 100ms 만큼 빨라짐
      .subscribe {
         println("$it - ${System.currentTimeMillis() - startTime}")
      }
Thread.sleep(1000)
결과:
Red – 168
Orange – 168
Yellow – 168
 
2. timeInterval() 함수
어떤 값을 발행했을 때 이전 값을 발행한 후 얼마나 시간이 흘렀는지 알 수 있습니다.

val colors = arrayOf("Red", "Green", "Orange")
colors.toObservable()
      .delay {
         Thread.sleep(Random().nextInt(1000).toLong())
         Observable.just(it)
      }
      .timeInterval()
      .subscribe {
         println("$it")
      }
결과:
Timed[time=671, unit=MILLISECONDS, value=Red]
Timed[time=275, unit=MILLISECONDS, value=Green]
Timed[time=238, unit=MILLISECONDS, value=Orange]
 
1초내로Random하게 스레드를 sleep()하므로 실행할 때 마다 1초미만으로 time 값이 다르게 나오는 것을 확인 할 수 있습니다.


0개의 댓글