1. reactive streams 란?
reactive streams 는 non-bloacking, backPressure 를 사용하여 비동기식 스트림 처리에 대한 표준을 제공하기 위한
라이브러리 입니다.
참조
https://www.reactive-streams.org/
2. reactive streams 특징
- 비동기적인 데이터 스트림 처리:
- Reactive Streams는 비동기적인 데이터 스트림을 처리하는 데에 중점을 둡니다.
- 데이터는 비동기적으로 발생하며, 소비자는 이러한 데이터를 처리하는 동안 블로킹되지 않습니다.
- Backpressure(백프레셔):
- Reactive Streams에서는 백프레셔 메커니즘이 사용됩니다. 이는 소비자가 데이터를 처리하는 속도보다 생산자가 데이터를 생성하는 속도가 빠를 때 생기는 문제를 해결하기 위한 메커니즘입니다.
- 백프레셔는 생산자가 데이터를 소비자에게 전송하기 전에 소비자가 처리할 수 있는 양을 조절합니다.
- Publisher, Subscriber, Subscription, Processor 인터페이스:
- Reactive Streams는 네 가지 핵심 인터페이스를 정의합니다.
- Publisher: 데이터를 생성하고 발행하는 역할을 합니다.
- Subscriber: 데이터를 구독하고 처리하는 역할을 합니다.
- Subscription: 데이터 흐름을 제어하는 역할을 합니다.
- Processor: Publisher와 Subscriber를 결합하는 역할을 합니다.
- Reactive Streams는 네 가지 핵심 인터페이스를 정의합니다.
- 표준 인터페이스 구현:
- Java 9부터는 Reactive Streams 스펙을 구현하는 java.util.concurrent.Flow 패키지가 제공됩니다. 이 패키지에는 위에서 언급한 네 가지 인터페이스와 관련된 클래스 및 유틸리티가 포함되어 있습니다.
- 다양한 라이브러리 및 프레임워크의 지원:
- Reactive Streams는 Java 뿐만 아니라 다양한 프로그래밍 언어와 라이브러리에서도 지원되고 있습니다.
- 예를 들어, Project Reactor, Akka Streams, RxJava 등이 Reactive Streams 스펙을 기반으로 구현되어 있습니다.
- 4가지 인터페이스 설명
1. Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Publisher(발행자) 의 subscribe 를 통해 Subscriber(구독자)는 Publisher 의 .출판물을 구독합니다.
2. Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscriber(구독자) 의 onSubscribe 를 통해 Publisher(발행자) 의 출판물을 Subscriber 에게 전달합니다.
이후 결과에 따라 onNext, onError, onComplete 를 사용하여 상황을 전달합니다.
3. Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription(출판물) 의 request 를 통해 내용물을 요청합니다.
cancel 을 통해 구독을 취소 할 수 있습니다.
4. Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
자세한 내용 아래 링크 참조
https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file
3. 프로세스 흐름
4. reatcive streams 간단하게 사용해보기
MyPub Class
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import java.util.Arrays;
public class MyPub implements Publisher<Integer> {
Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
@Override
public void subscribe(Subscriber<? super Integer> s) {
// 해당 메서드를 통해 구독자는 구독 요청을 하고
// 발행자는 구독정보를 만들어 구독자에게 전달합니다.
System.out.println("구독자: 구독요청!");
System.out.println("발행자: 구독정보 생성");
MySubscription subscription = new MySubscription(s, its);
System.out.println("발행자: 구독정보 생성완료");
s.onSubscribe(subscription);
System.out.println("발행자: 구독정보 전달완료");
}
}
MySub Class
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class MySub implements Subscriber<Integer> {
private Subscription s; // 전달받은 구독 정보
private int bufferSize = 3; // 한번에 전달할 데이터 양
@Override
public void onSubscribe(Subscription s) {
// 해당 메서드를 통해 구독정보를 전달받는다.
System.out.println("구독자: 구독정보 받음");
this.s = s;
System.out.println("구독자: 데이터요청!");
s.request(bufferSize); // request 를 통해 구독정보를 읽음
}
@Override
public void onNext(Integer t) {
// 구독정보가 있으면 Subscription 이 onNext() 를 호출할것임.
System.out.println("onNext(): " + t);
bufferSize--;
if( bufferSize == 0 ) {
bufferSize = 3;
System.out.println("하루지남");
s.request(bufferSize);
}
}
@Override
public void onError(Throwable t) {
System.out.println("구독중 에러");
}
@Override
public void onComplete() {
System.out.println("구독 완료");
}
}
MySubscription Class
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.Iterator;
public class MySubscription implements Subscription {
private Subscriber s;
private Iterator<Integer> it;
public MySubscription(Subscriber s, Iterable<Integer> its) {
this.s = s;
this.it = its.iterator();
}
@Override
public void request(long n) {
// 구독정보가 있을 경우 구독자에게 onNext() 함수를 통해 데이터를 전달합니다.
// 전달이 완료되거나 에러가 발생하였을때, onComplete() 또는 onError() 를 호출합니다.
while(n > 0) {
if(it.hasNext()) {
s.onNext(it.next());
} else {
s.onComplete();
break;
}
n--;
}
}
@Override
public void cancel() {
// 구독정보를 취소할때 사용합니다.
}
}
Main
public class App {
public static void main(String[] args) {
MyPub pub = new MyPub();
MySub sub = new MySub();
pub.subscribe(sub);
}
}
결과
구독자: 구독요청!
발행자: 구독정보 생성
발행자: 구독정보 생성완료
구독자: 구독정보 받음
구독자: 데이터요청!
onNext(): 1
onNext(): 2
onNext(): 3
하루지남
onNext(): 4
onNext(): 5
onNext(): 6
하루지남
onNext(): 7
onNext(): 8
onNext(): 9
하루지남
onNext(): 10
구독 완료
발행자: 구독정보 전달완료
여기서 데이터를 3개씩 받은 이유는 Subsciber 에서 buffer 를 3으로 설정하였기 때문입니다.
참조
https://www.youtube.com/watch?v=6TiUCm3K_IE&list=PL93mKxaRDidFH5gRwkDX5pQxtp0iv3guf&index=5
'Java' 카테고리의 다른 글
JPA 란? (1) | 2024.01.21 |
---|---|
[Spring] variable not initialized in the default constructor 에러 (0) | 2024.01.11 |
Spring Webflux 간단하게 알아보기 (0) | 2023.12.25 |
[Java] SHA-256 암호화 & Salt ( 적용예시 ) (1) | 2023.05.17 |
[Java] ArrayList 정렬 / Collections, List, 사용자정의 (0) | 2023.05.07 |