본문 바로가기

Java

[Java] Reactive Streams 란?

1.  reactive streams 란?

 

reactive streams 는 non-bloacking, backPressure 를 사용하여 비동기식 스트림 처리에 대한 표준을 제공하기 위한

라이브러리 입니다.

 

참조

https://www.reactive-streams.org/

 

https://www.reactive-streams.org/

Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols. JDK9 java

www.reactive-streams.org

 


2.  reactive streams 특징

 

  1. 비동기적인 데이터 스트림 처리:
    • Reactive Streams는 비동기적인 데이터 스트림을 처리하는 데에 중점을 둡니다.
    • 데이터는 비동기적으로 발생하며, 소비자는 이러한 데이터를 처리하는 동안 블로킹되지 않습니다.
  2. Backpressure(백프레셔):
    • Reactive Streams에서는 백프레셔 메커니즘이 사용됩니다. 이는 소비자가 데이터를 처리하는 속도보다 생산자가 데이터를 생성하는 속도가 빠를 때 생기는 문제를 해결하기 위한 메커니즘입니다.
    • 백프레셔는 생산자가 데이터를 소비자에게 전송하기 전에 소비자가 처리할 수 있는 양을 조절합니다.
  3. Publisher, Subscriber, Subscription, Processor 인터페이스:
    • Reactive Streams는 네 가지 핵심 인터페이스를 정의합니다.
      • Publisher: 데이터를 생성하고 발행하는 역할을 합니다.
      • Subscriber: 데이터를 구독하고 처리하는 역할을 합니다.
      • Subscription: 데이터 흐름을 제어하는 역할을 합니다.
      • Processor: Publisher와 Subscriber를 결합하는 역할을 합니다.
  4. 표준 인터페이스 구현:
    • Java 9부터는 Reactive Streams 스펙을 구현하는 java.util.concurrent.Flow 패키지가 제공됩니다. 이 패키지에는 위에서 언급한 네 가지 인터페이스와 관련된 클래스 및 유틸리티가 포함되어 있습니다.
  5. 다양한 라이브러리 및 프레임워크의 지원:
    • Reactive Streams는 Java 뿐만 아니라 다양한 프로그래밍 언어와 라이브러리에서도 지원되고 있습니다.
    • 예를 들어, Project Reactor, Akka Streams, RxJava 등이 Reactive Streams 스펙을 기반으로 구현되어 있습니다.

 

 - 4가지 인터페이스 설명

 

1. Publisher

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

 

Publisher(발행자) 의 subscribe 를 통해 Subscriber(구독자)는 Publisher 의 .출판물을 구독합니다.

 

2. Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

 

Subscriber(구독자) 의 onSubscribe 를 통해 Publisher(발행자) 의 출판물을 Subscriber 에게 전달합니다.

이후 결과에 따라 onNext, onError, onComplete 를 사용하여 상황을 전달합니다.

 

3. Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}

 

Subscription(출판물) 의 request 를 통해 내용물을 요청합니다.

cancel 을 통해 구독을 취소 할 수 있습니다.

 

 

4. Processor

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

 

 

 

자세한 내용 아래 링크 참조

https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file


3.  프로세스 흐름

 

출처:&nbsp; &nbsp;인프런 -&nbsp; Kevin의 알기 쉬운 RxJava 1부

 

 


4. reatcive streams 간단하게 사용해보기

 

MyPub Class

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.Arrays;

public class MyPub implements Publisher<Integer> {

    Iterable<Integer> its = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

    @Override
    public void subscribe(Subscriber<? super Integer> s) {
    	// 해당 메서드를 통해 구독자는 구독 요청을 하고
        // 발행자는 구독정보를 만들어 구독자에게 전달합니다.
    
        System.out.println("구독자: 구독요청!");
        
        System.out.println("발행자: 구독정보 생성");
        MySubscription subscription = new MySubscription(s, its);
        System.out.println("발행자: 구독정보 생성완료");

        s.onSubscribe(subscription);
        System.out.println("발행자: 구독정보 전달완료");

    }
}

 

MySub Class

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MySub implements Subscriber<Integer> {

    private Subscription s;     // 전달받은 구독 정보
    private int bufferSize = 3; // 한번에 전달할 데이터 양

    @Override
    public void onSubscribe(Subscription s) {
        // 해당 메서드를 통해 구독정보를 전달받는다.
        System.out.println("구독자: 구독정보 받음");
        this.s = s;

        System.out.println("구독자: 데이터요청!");
        s.request(bufferSize); // request 를 통해 구독정보를 읽음
    }

    @Override
    public void onNext(Integer t) {
    	// 구독정보가 있으면 Subscription 이 onNext() 를 호출할것임.
        System.out.println("onNext(): " + t);
        bufferSize--;
        if( bufferSize == 0 ) {
            bufferSize = 3;
            System.out.println("하루지남");
            s.request(bufferSize);
        }
    }

    @Override
    public void onError(Throwable t) {
        System.out.println("구독중 에러");
    }

    @Override
    public void onComplete() {
        System.out.println("구독 완료");
    }
}

 

 

MySubscription Class

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.Iterator;

public class MySubscription implements Subscription {

    private Subscriber s;
    private Iterator<Integer> it;

    public MySubscription(Subscriber s, Iterable<Integer> its) {
        this.s = s;
        this.it = its.iterator();
    }

    @Override
    public void request(long n) {
		// 구독정보가 있을 경우 구독자에게 onNext() 함수를 통해 데이터를 전달합니다.
        // 전달이 완료되거나 에러가 발생하였을때, onComplete() 또는 onError() 를 호출합니다.
		
        while(n > 0) {
            if(it.hasNext()) {
                s.onNext(it.next());
            } else {
                s.onComplete();
                break;
            }

            n--;
        }

    }

    @Override
    public void cancel() {
		// 구독정보를 취소할때 사용합니다.

    }

}

 

 

 

Main 

public class App {

    public static void main(String[] args) {

        MyPub pub = new MyPub();
        MySub sub = new MySub();

        pub.subscribe(sub);

    }
}

결과

구독자: 구독요청!
발행자: 구독정보 생성
발행자: 구독정보 생성완료
구독자: 구독정보 받음
구독자: 데이터요청!
onNext(): 1
onNext(): 2
onNext(): 3
하루지남
onNext(): 4
onNext(): 5
onNext(): 6
하루지남
onNext(): 7
onNext(): 8
onNext(): 9
하루지남
onNext(): 10
구독 완료
발행자: 구독정보 전달완료

 

여기서 데이터를 3개씩 받은 이유는 Subsciber 에서 buffer 를 3으로 설정하였기 때문입니다.

 


 

참조

https://www.youtube.com/watch?v=6TiUCm3K_IE&list=PL93mKxaRDidFH5gRwkDX5pQxtp0iv3guf&index=5