반응형

Backpressure Starategy (배압 기능)


 

RxJava의 기본이 되는 Flowable, Observable 등을 다루기에 앞서 Backpressure Strategy를 이해해야 한다.

Backpressure Strategy는 배압 기능이라고 번역되며, 이는 통지하는 데이터의 개수를 제어하는 기능이라고 생각하면 된다.

아래와 같은 상황을 보자.

 

 

위의 상황과 같이 만약 소비자의 입장에서 한 번에 처리할 수 있는 데이터가 5개로 제한된다고 한다면, 아무리 생산자가 많은 데이터를 통지할 수 있더라도 소비자에게 최대 5개의 데이터만 전달해야 할 것이다. 이럴 때 배압 기능을 이용해 데이터의 최대 전달량을 5로 설정할 수 있는 것이다.

 

 

 

Flowable vs Observable


 

RxJava에서의 대표적인 생산자로 Flowable과 Observable이 있으며, 이 두 생산자의 가장 큰 차이점이 바로 배압 기능의 유무이다. 또한, Flowable은 Reactive Streams를 구현하여 해당 인터페이스를 지원하지만, Observable은 Reactive Streams를 지원하지 않는다는 차이점이 있다. 생산자의 기능을 살펴보기에 앞서 이 두 가지 생산자의 차이점을 정리해보자면 다음과 같다.

 

Flowable RxJava 1.x Subscriber O O 대량 데이터의 처리, 네트워크/DB 등의 I/O 처리
Observable RxJava 2.x Observer X X 소량 데이터의 처리, GUI 이벤트, 자바 표준 Stream 대신 사용할 때

 

대표적으로 위와 같은 차이점이 있으며, 기본적으로 Observable이 Flowable에 비해서 오버헤드가 적다고 알려져 있다.

상세한 차이점은 RxJava의 Document를 참고하여 시기 적절하게 생산자 및 소비자를 사용해야 한다. 캘린더에서는 Observer를 사용하여 개발하고 있다.

 

 

 

Flowable / Subscriber


 

앞서 언급한 바와 같이, Flowable / Subscriber는 Reactive Streams를 구현하여 해당 인터페이스를 지원하는 생산자 / 소비자이다. RxJava에서는 이 생산자 / 소비자에 대해서 다양한 메소드를 지원하여 구현하도록 하고 있는데, 대표적으로 다음과 같이 구현되고 있다. 아래의 코드에서 Flowable의 기본적인 구현 방법을 확인하고, 특히 request 메소드를 통해서 데이터의 개수를 명시적으로 요청하고 있음을 주목하자.

 

public static void main(String[] args) throws Exception {
	// Flowable.create 메소드를 통해 Flowable을 만들 수 있다.
	Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {			
		@Override
		// FlowableEmitter는 onNext, onComplete 등의 프로토콜을 호출할 수 있는 인터페이스이다.   
		public void subscribe(FlowableEmitter<String> emitter) throws Exception {			
			String[] datas = { "Hello", "Byungwook!" };
			for (String data : datas) {
				// FlowableEmitter를 통해 현재 구독이 취소되었는지 여부를 확인할 수 있다.
			    if (emitter.isCancelled()) {												
					return;
				}
				emitter.onNext(data);
			}
			emitter.onComplete();
		}
	}, BackpressureStrategy.BUFFER);

	flowable
		// Schedulers.computation() 메소드를 통해서 개별 스레드를 통해 Subscriber를 처리함을 명시할 수 있다.
		.observeOn(Schedulers.computation())
		.subscribe(new Subscriber<String>() {
			private Subscription subscription;
			
			// onSubscribe 메소드를 통해 구독이 일어날 때의 동작을 명시한다.
			@Override
			public void onSubscribe(Subscription subscription) {
				this.subscription = subscription;
				// request 메소드를 통해서 1개의 데이터를 명시적으로 요청하고 있다.
				this.subscription.request(1L);
			}

			// onNext 메소드를 통해 데이터를 전달받았을 때의 동작을 명시한다.
			@Override
			public void onNext(String data) {
				System.out.println("Data : " + data);
			 	// request 메소드를 통해서 1개의 데이터를 명시적으로 요청하고 있다.
				this.subscription.request(1L);
			}

			// onComplete 메소드를 통해 데이터 흐름의 처리가 완료되었을 때 동작을 명시한다.
			@Override
			public void onComplete() {
				System.out.println("All process completed.");
			}
	
			// onError 메소드를 통해 데이터 흐름 도중 에러가 발생했을 때의 동작을 명시한다.
			@Override
			public void onError(Throwable error) {
				error.printStackTrace();
			}
		});

	Thread.sleep(500L);
}

 

 

 

Observable / Observer


 

Observer를 사용하는 경우에는 Flowable의 구현과 비슷하지만, 배압 기능을 지원하지 않기 때문에 데이터를 명시적으로 요청(request)하지 않는다. 즉, 구독(subscribe)이 시작되는 순간 데이터가 생산된다. 아래의 코드에서의 예시를 보자.

 

public static void main(String[] args) throws Exception {
	// Observable.create 메소드를 통해 새로운 Observable을 생성할 수 있다.
	Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
		@Override
		// onNext, onComplete 등의 작업을 위한 ObservableEmitter 인터페이스를 받는다.
		public void subscribe(ObservableEmitter<String> emitter) throws Exception {
			String[] datas = { "Hello,", "Byungwook!" };
			for (String data : datas) {
				// emitter.isDisposed() 메소드를 통해 구독의 취소 여부를 확인할 수 있다.
				if (emitter.isDisposed()) {
					return;
				}
				// onNext 메소드를 통해 데이터를 전달한다.
				emitter.onNext(data);
			}
			// onComplete 메소드를 통해 구독의 완료로 인한 정상 종료를 알린다.
			emitter.onComplete();
		}
	});

	observable
		.observeOn(Schedulers.computation())
		.subscribe(new Observer<String>() {
			@Override
			public void onSubscribe(Disposable disposable) {
				// Do nothing
			}
			
 		 	// onNext 메소드를 통해 데이터를 전달받았을 때의 동작을 명시한다. 
			@Override
			public void onNext(String item) {
				System.out.println("Data : " + data);
			}

 		 	// onComplete 메소드를 통해 데이터 흐름의 처리가 완료되었을 때의 동작을 명시한다. 
			@Override
			public void onComplete() {
				System.out.println("All Process Completed.")
			}

 		 	// onComplete 메소드를 통해 오류가 발생했을 때의 동작을 명시한다.
			@Override
			public void onError(Throwable error) {
				error.printStackTrace();
			}
		});
	
	Thread.sleep(500L);
}

 

 

 

Operator (연산자)


 

RxJava에서는 생산자가 통지한 데이터가 소비자에게 도착하기 전에 불필요한 데이터를 삭제하거나, 소비자가 사용하기 쉽게 데이터를 변환하는 등 데이터를 변환하는 경우가 많다. 예를 들어 생산자가 소비한 데이터를 2배한 값이 필요하거나, 홀수를 가진 데이터는 삭제하고 싶은 경우가 있을 수 있다. 이럴 때 생산자가 통지하는 데이터를 생성하거나 필터링, 또는 변환하는 메소드를 사용할 수 있는데 이를 연산자(operator)라고 한다. 다음의 코드를 참고하면 연산자의 역할이 무엇인지 명확하게 이해할 수 있을 것이다.

 

public static void main(String[] args) {
	// Flowable.just는 인자로 설정해둔 값 만을 생산하는 단순 생산자를 만드는 메소드이다.
	Flowable<Integer> flowable = Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
		// filter 연산자를 사용해서 짝수의 값 만을 취함을 명시하고 있다.
		.filter(data -> data % 2 == 0)
		// map 연산자를 사용해서 데이터의 크기를 100배 크게 변환하고 있다.
		.map(data -> data * 100);

	// 구독해서 받은 데이터를 출력한다.
	flowable.subscribe(data -> System.out.println("Data = " + data));
}

 

연산자는 데이터를 메소드 체인(method chain)에 통과하게 해서 생산자가 만드는 원본 데이터를 다양하게 변화시키고 있다. 이는 디자인 패턴 중의 하나인 빌더 패턴(builder pattern)과 유사하다고 할 수 있으며, 연산자에서 처리되는 동작은 생산자의 통지 처리가 시작되고 소비자가 통지를 받는 시점에서 실행된다.

 

 

 

참고 (Reference)


 

RxJava 리액티브 프로그래밍, 스다 토모유키 지음, 이승룡 옮김

 

반응형

'Programming > Java' 카테고리의 다른 글

[RxJava] Reactive Streams, RxJava란 무엇인가?  (0) 2022.11.15
복사했습니다!