CS
  • CS-Study
  • database
    • B-Tree와 B+Tree
    • DB JOIN
    • DB Lock
    • DB 트래픽
    • DBCP (DB Connection Pool)
    • Flyway
    • Message Broker
    • MySQL InnoDB 스토리지 엔진
    • MySQL 엔진 아키텍처
    • RDB와 NoSQL
    • Redis
    • SQL Injection
    • 스키마 (Schema)
    • Table Scan과 Index Scan
    • Apache Kafka
    • Key
    • 뷰 (View)
    • 인덱스
    • 정규화
    • RDBMS, NoSQL의 클러스터링/리플리케이션 방식
    • 트랜잭션(Transaction)
    • 트랜잭션의 격리성(Transaction Isolation)
    • 프로시저와 트리거
    • DB 정규화 (Normalization)
  • etc
    • MSA
    • REST, REST API, RESTful
    • SOLID 원칙
    • TDD (Test-Driven Development)
    • 서버리스
    • 컨테이너와 도커
  • java
    • Collections
    • Garbage Collection
    • Generic
    • JDBC
    • Java Virtual Machine(JVM)
    • Java Thread
    • Java8 vs Java11 vs Java17
    • 객체지향 프로그래밍 OOP (Object Oriented Programing)
    • Optional
    • RxJava(Reactive Programming)
    • 문자열(String & StringBuffer & StringBuilder)
    • Synchronized
    • Virtual Thread
    • Wrapper Class
    • Equals()와 Hashcode()
    • final
    • Jackson 라이브러리
    • 리플렉션(Reflection)
    • static class와 static method
    • 스트림(Stream)과 람다(Lambda)
    • 스프링 프레임워크에서 사용되는 디자인 패턴
    • 예외처리(Exception)
    • Java Annotation
    • 추상클래스와 인터페이스
  • network
    • 3-way handshake
    • 4-way Handshake
    • DHCP(Dynamic Host Configuration Protocol)
    • DMZ(DeMilitarized Zone)
    • DNS(Domain Name System)
    • HTTP Method
    • HTTP 버전 비교
    • HTTP status code
    • HTTP
    • IP Address
    • Mutiplexing & Demultiplexing
    • OSI 7계층
    • SOP, CORS
    • TCP와 UDP
    • XSS와 CSRF
    • gRPC
    • Stateless와 Connectionless
    • 라우터 Router
    • 로드밸런서(Load Balancer)
    • 브라우저에 URL입력시 네트워크 상 일어나는 일
    • 서브넷 마스크, 게이트웨이
    • 웹 소켓과 소켓 통신
    • 쿠키(Cookie)와 세션(Session)
  • operating-system
    • IPC (Inter Process Communication)
    • 인터럽트
    • TLB
    • 스레싱 Thrashing
    • Thread Pool, Fork-Join
    • Thread Safe
    • 프로세스
    • 가상 메모리
    • 데드락 (DeadLock, 교착 상태)
    • 동기/비동기 & 블로킹/논블록킹
    • 동기화(Synchronization)
    • 메모리 할당과 단편화
    • 뮤텍스와 세마포어, 모니터
    • 세그먼테이션과 페이징
    • 운영체제
    • 캐시 메모리
    • Context switching(문맥 교환)
    • 컴파일
    • 파일 시스템
    • 페이지 교체 알고리즘(Page Replacement Algorithm)
    • 프로세서 스케줄링 알고리즘
    • 프로세스 주소 공간
  • spring
    • @Transactional
    • AOP(Aspect-Oriented Programming)
    • DTO, DAO, VO, Entity
    • DispatcherServlet
    • Hibernate, JPA, Spring Data JPA
    • Ioc와 DI
    • JPA 연관관계 맵핑
    • N+1 Problem
    • ORM
    • Persistence Context
    • SQL Mapper vs ORM vs QueryBuilder
    • Servlet Filter와 Spring Interceptor
    • Servlet
    • Spring MVC와 Spring Boot
    • Tomcat
    • WebFlux
Powered by GitBook
On this page
  • RXJava란
  • Reactive Programming 장점
  • Reactive Programming에서의 Data Stream
  • Observables and Observers
  • Operators
  • 사용예시
  • Subscription and Unsubscription
  1. java

RxJava(Reactive Programming)

RXJava란

절차대로 실행되는 프로그램을 명령형(Imperative) 프로그래밍이라 한다.

RX는 Reactive의 줄임말로 데이터의 흐름을 정의하고 데이터가 변경되었을때 연관된 작업이 실행되는 반응형 프로그래밍을 도와주는 라이브러리이다.

RXJava의 기본원리

기존에는 필요한 데이터를 사용자가 가져와(pull)사용했다면 Reactive방식의 경우 데이터의 변화가 발생한 곳에서 데이터를 전달(push)해준다.

예시

명령형 프로그래밍에서 a=b+c 라는 코드가 있을때 a에 b+c가 할당된후 b와c가 변경되어도 a값은 변하지 않음 하지만 Reactive Programming에서는 프로그램이 b와 c가 변경될때마다 a의 값이 자동으로 업데이트됌

Reactive Programming 장점

비동기(Asynchronous)

  • I/O 작업이나 외부 서비스 호출이 완료되기를 기다리는 동안 다른 작업을 처리할 수 있는 스레드를 확보

non-blocking

  • 반응형 시스템에서 컴포넌트는 non-blocking으로 설계되어 리소스를 차단하거나 리소스를 사용할 수 있게 될 때까지 기다리지 않습니다. 대신 콜백을 등록하거나 이벤트를 구독하여 리소스를 사용할 준비가 되면 알림을 받습니다.

Scalable

  • non-blocking 및 이벤트 중심 특성 덕분에 리소스를 효율적으로 관리할 수 있어 애플리케이션이 최소한의 리소스로 많은 수의 동시 연결 또는 요청을 처리할 수 있습니다.

Reactive Programming에서의 Data Stream

Observables and Observers

  1. Observable이 데이터 스트림을 처리하고, 완료되면 데이터를 발행

  2. 데이터를 발행할 때마다 구독하고 있는 모든 Observer가 알림을 받음

  3. Observer는 수신한 데이터를 가지고 작업처리

Stream API 와의 차이점

Sync blocking이며 Stream API는 pull model을 사용합니다. 다음 요소를 pull 해와서 Exception, 결과 반환 하는 Return 등의 데이터 처리를 반복합니다.

Reactive Programming은 push 방식을 사용합니다

Asynch non blocking이며 observable은 별도의 스레드에서 실행된 다음 onNext, onError 및 onComplete와 같은 이벤트를 사용하여 결과를 메인 스레드(클라이언트)로 push 합니다.

Operators

Operators는 스트림의 데이터를 변환하거나 필터링하는 함수입니다.

개발자는 연산자를 사용하여 다양한 방식으로 observables을 조작하고 결합할 수 있습니다.

RxJava Operators 분류

카테고리
연산자
설명

생성

just()

고정된 개수의 항목을 발행

fromArray()

배열의 항목을 발행

fromIterable()

Iterable의 항목을 발행

create()

ObservableOnSubscribe를 사용하여 Observable을 생성

interval()

일정 시간 간격으로 항목을 발행

변환

map()

각 항목을 다른 항목으로 변환

flatMap()

각 항목을 Observable로 변환하고 병합

concatMap()

각 항목을 Observable로 변환하고 순서대로 병합

switchMap()

가장 최근의 Observable로 전환

reduce()

각 항목을 결합하여 단일 결과 생성

scan()

각 항목을 결합하여 중간 결과를 생성

제어

filter()

조건을 만족하는 항목만 통과

take()

처음 N개의 항목을 발행

skip()

처음 N개의 항목을 건너뜀

결합

zip()

여러 Observable의 항목을 결합하여 단일 Observable 생성

combineLatest()

가장 최근의 항목을 결합하여 발행

사용예시

Observable.create()

  1. Observable.create()를 사용하여 직접 Observable을 생성

  2. ObservableOnSubscribe를 통해 항목을 발행합니다.

import io.reactivex.rxjava3.core.Observable;

public class CreateObservableExample {
    public static void main(String[] args) {
        Observable<Integer> observable = Observable.create(emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete(); // 완료를 알림
        });

        observable.subscribe(
                item -> System.out.println("Received: " + item),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
        );
    }
}

Observable.interval()

  • Observable.interval()을 사용하여 주기적으로 항목을 발행하는 Observable을 생성합니다. 이 예제는 일정 시간 간격으로 숫자를 발행합니다.

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;

public class IntervalObservableExample {
    public static void main(String[] args) {
        Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);

        observable.subscribe(
                item -> System.out.println("Received: " + item),
                error -> System.err.println("Error: " + error),
                () -> System.out.println("Completed")
        );

        // 잠시 대기 (예제를 위해)
        try {
            Thread.sleep(5000); // 5초 대기
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Subscription and Unsubscription

Subscriptions은 observers가 observables이 내보내는 이벤트를 수신할 수 있는 메커니즘입니다.

observers가 더 이상 이벤트를 수신할 필요가 없는 경우, observable의 구독을 취소하여 리소스를 효과적으로 정리할 수 있습니다.

PreviousOptionalNext문자열(String & StringBuffer & StringBuilder)

Last updated 9 months ago