Post

Context

Reactor Context에 대해서

Context

Context의 사전적 의미는 문맥 또는 맥락입니다.

Reactor API 문서에서는 Context를 다음과 같이 정의합니다.

A key/value store that is propagated between components such as operators via the context protocol.

Reactor의 Context는 Operator 같은 Reactor 구성요소 간에 전파되는 key / value 형태의 저장소라고 정의할 수 있습니다.

여기서 전파란 downstream에서 upstream으로 context가 전파되어 operator가 해당 context의 정보를 동일하게 이용할 수 있음을 의미합니다.

Reactor의 Context는 ThreadLocal과 다소 유사한 면이 있지만, 실행 스레드와 매핑되는 ThreadLocal과 달리 Context는 Subscriber와 매핑됩니다.

Context는 다음과 같이 사용할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package org.example;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Slf4j
public class Example11_1 {
    public static void main(String[] args) throws InterruptedException{
        Mono.deferContextual(ctx -> 
            Mono.just("hello "+ctx.get("firstName")).doOnNext(data -> log.info("just doOnNext : {}", data))
        )
        .subscribeOn(Schedulers.boundedElastic())
        .publishOn(Schedulers.parallel())
        .transformDeferredContextual(
            (mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
        )
        .contextWrite(context -> context.put("lastName", "jobs"))
        .contextWrite(context -> context.put("firstName", "steve"))
        .subscribe(data -> log.info("subscribe : {}", data));

        Thread.sleep(2000L);

    }

}

실행 결과

1
2
14:44:31.467 [main] INFO org.example.Example11_1 -- just doOnNext : hello steve
14:44:31.477 [main] INFO org.example.Example11_1 -- subscribe : hello steve jobs

Context의 특징

  • Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Slf4j
public class Example11_2 {
    public static void main(String[] args) throws InterruptedException {
        final String key1 = "company";

        Mono<String> mono = Mono.deferContextual(
            ctx -> Mono.just("Company : " + ctx.get(key1))
        ).publishOn(Schedulers.parallel());

        mono.contextWrite(ctx -> ctx.put(key1, "SK Telecom"))
            .subscribe(data -> log.info("subscribe : {}", data));

        mono.contextWrite(ctx -> ctx.put(key1, "KT"))
            .subscribe(data -> log.info("subscribe : {}", data));

        Thread.sleep(2000L);
    }
}
1
2
15:44:51.690 [parallel-1] INFO org.example.Example11_2 -- subscribe : Company : SK Telecom
15:44:51.690 [parallel-2] INFO org.example.Example11_2 -- subscribe : Company : KT

구독이 발생할 때마다 해당하는 하나의 Context가 하나의 구독에 연결되기에 다른 회사명이 출력되는 것을 확인할 수 있습니다.

  • Context는 Opeartor 체인의 아래에서 위로 전파된다.
  • 동일한 키에 대한 값을 중복해서 저장하면 Operator 체인에서 가장 위쪽에 위치한 contextWrite()이 저장한 값으로 덮어쓴다.
    • 따라서 일반적으로 모든 Operator에서 Context에 저장된 데이터를 읽을 수 있도록 contextWrite()을 operator 체인의 맨 마지막에 둡니다.
  • InnerSequence 내부에서는 외부 컨텍스트에 저장된 데이터를 읽을 수 있다.
  • InnerSequence 외부에서는 내부 Context 데이터를 읽을 수 없다.
    • 인증 정보 같은 독립성 정보를 전송하는데 적합하다
This post is licensed under CC BY 4.0 by the author.