go channel 을 이용한 스트리밍 데이터 파이프라인(Streaming Data Pipeline)

데이터 파이프라인(Data Pipelining)

Go 를 이용하면 스트리밍 데이터 파이프라인을 구축하기가 비교적 쉬워요.
I/O 작업이나 멀티코어 CPU 자원을 효율적으로 사용하면서 말이죠.

Go 에서 말하는 파이프라인이라는게 대체 뭘까요?

Go 에서는 공식적으로 파이프라인 자체를 정의하는 뭔가가 있는 것은 아니예요.
그냥 뭐랄까...
go channel 로 연결된 순차적인 작업 Stage 라고 할까요?

건물 짓고,
건물 사이에 파이프 연결하고,
파이프가 시작되는 곳에서 구멍에 물 흘려보내면...

그게 물을 전달하는 파이프라인인거죠...;;

(건물 == Stage), (파이프 == 채널), (물 == 데이터)
건물로 들어오는 파이프는 Inbound 채널, 건물에서 나가는 파이프는 Outbound 채널.

  • 각 Stage 는 이전 Stage 로부터 Inbound 채널을 통해 데이터를 받습니다.
  • 그 데이터에 대해 어떤 작업(함수)을 수행하고 그 결과로 새로운 결과 데이터를 생산합니다.
  • 새롭게 생산된 결과 데이터를 Outbound 채널을 통해 다음 Stage 로 넘깁니다.

결국은 각 Stage 사이에 수도 파이프를 연결하듯이 Channel 을 연결한다는 것이고, 그를 통해 데이터를 물흐르듯이 전달한다는 말이네요.
당연히 첫번 째 Stage 는 Outbound Channel 만 필요할 것이고, 마지막 Stage 는 Inbound Channel 만 필요하겠네요.
중간 단계의 Stage 는 Inbound 와 Outbound Channel 이 모두 필요할 것이구요.

이를 좀 추상화해서 생각해보면, 중간 단계의 Stage 들은 묶어서 하나의 Message Queue 처럼 생각할 수도 있겠네요.
아래 그림 처럼요.

그러면, 첫번째 Stage 는 Producer 라고 부를 수 있겠고, 마지막 Stage 는 Consumer 라고 부를 수 있겠죠.
또는 데이터 Source(생성자)와 데이터 Sink(소비자) 라고 부르기도 합니다.

다음과 같은 Stage 들로 구성된 데이터 파이프라인이 있다고 해봅시다.
각 Stage 를 함수라고 생각하셔도 좋습니다.

첫번째 Stage 인 main 함수에서 초기 데이터를 generate 로 넘겨주면 generate 에서는 자신의 일을 수행하고 그 결과물을 square stage 로 (channel 을 통해) 넘겨줍니다.
square stage 에서도 마찬가지로 받은 데이터를 가공한 후 최종 stage 인 main 으로 다시 돌려줍니다.
채널을 이용한 아주 간단한 데이터 스트리밍의 예인 거예요.

func main() {
    outChan1 := generate(2, 3)
    outChan2 := square(outChan1)

    fmt.Println(<- outChan2) // 4
    fmt.Println(<- outChan2) // 9
}

위 코드를 보면 맨처음 main 에서 generate 로 데이터를 넘겨줄 때 channel 을 이용하지 않고 함수 인자로 넘겨주고 있네요.
이 부분은 아무래도 좋습니다.
channel 을 이용해서 넘겨주도록 구현할 수도 있어요.

중요한 것은 중간 Stage 인 generate 함수와 square 함수가 outChan1 을 통해 연결되고 있다는 점입니다.
즉, 무언가 generate Stage 에서 작업을 수행한 결과물이 square 함수쪽으로 outChan1 채널을 통해 흘러들어간다(Streaming)는 점이 중요하다는거죠.

generate 함수에서는 뭘 하는지 안을 좀 들여다 봅시다.

func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

받은 함수의 인자들을 그대로 차례차례 Outbound 채널(out)로 보내고 있네요.
특별하게 뭔가 데이터를 가공하지는 않고, 인자로 받은 여러 개의 수를 채널 속으로 하나하나 흘려주고 있습니다.
그런데, 이 작업은 고루틴을 생성해서 하고 있어요.
이 고루틴은 생성이 되면 곧 바로 out 채널로 숫자들을 흘려보내기 시작할 겁니다.
다 보냈다 싶으면 채널을 닫아주고 있구요.
(채널은 항상 보내는 쪽에서 닫아주는게 좋죠.)
그리고, 원래의 고루틴은 생성된 고루틴과는 별개로 out 채널을 즉시 리턴해주고 있습니다.

generate 가 리턴해주는 out 채널을 이용하면, 누구든 generate 에서 생성한 고루틴이 차례차례 흘려보내는 숫자들을 받아볼 수 있겠네요.
위의 main 함수를 보면 이 out 채널을 square 함수로 바로 넘겨주게 되어 있어요.
그 말은 square 함수가 이 out 채널에서 숫자들을 받아서 뭔가 한다는 말이겠네요.

즉, outChan1 이라는 파이프를 generate 와 square 사이에 연결하는 공사를 해준겁니다.

square 가 그 숫자들을 받아서 무슨 짓을 하고 있는지 봅시다.

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

square 에서도 out 채널을 하나 생성하네요.
이게 왜 필요하죠?

square 도 데이터 파이프라인에서 중간 단계의 Stage 이니까요.
out 채널을 하나 만들어서 자신이 수행한 결과 데이터를 흘려보내줘야만, 다음 Stage 에서 그 채널을 통해 데이터를 받을 수 있죠.
square Stage 와 다음 Stage 간에 파이프를 하나 꽂아주는거란 말이죠.

square 에서도 generate 와 마찬가지로 실제 작업은 고루틴을 생성해서 비동기적으로 수행하고 있네요.
in 채널에서 받은 데이터를 가공하여 그 결과(제곱 수)를 out 채널로 흘려주고 있습니다.

자, 이제 square 에서 리턴하는 out 채널을 이용해서 누군가가 데이터만 빨아들이면 되겠네요.
그게 바로 main 함수의 아래 코드가 아니겠어요?
square 함수가 리턴한 outChan2 채널로부터 데이터를 2 개 빨아들이고 있잖아요.
위쪽에서 말한 최종 Stage 의 Consumer 역할이 되겠네요.

func main() {
...
    fmt.Println(<-outChan2) // 4
    fmt.Println(<-outChan2) // 9
}

요런 파이프 연결 개념을 이용해보면 아래처럼 파이프를 중간에 쉽게 추가로 꽂을 수도 있어요.
square 로 나온 결과를 main Stage 로 넘기지 말고, square Stage 를 한번 더 타게 하는 것도 가능하겠죠?

func main() {
    for n := range square(square(generate(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

Fan-out, fan-in

위에 살펴본 내용을 바탕으로 좀 더 나가봅시다.
데이터 스트리밍에서 fan-out, fan-in 이 뭔지 좀 알아야겠어요.

여러 개의 함수에서 하나의 채널로부터 데이터를 읽어야하는 경우도 있지 않을까요?
이런 경우가 fan-out 입니다.
뭔가 부채펼쳐지듯 펼쳐지는 모양이잖아요.
func1, func2, func3, ... 함수들은 channel 이 닫히기 전까지 모두 같은 채널에서 읽어들이기 경쟁을 펼칠거예요.
func1, func2, func3 와 같이 다른 함수들일 수도 있지만, func1 함수 여러개가 동시에 수행되는 경우도 마찬가지입니다.

모양만으로 짐작할 수 있듯이 fan-out 은 뭔가 작업을 다수의 worker 들에게 분산해주고 싶을 때 사용하겠죠.
이렇게 하면 CPU 를 병렬로 사용할 수 있어서 동시에 처리할 수 있는 작업이 많아질테니까요.

직관적으로도 이해할 수 있는데, fan-in 은 fan-out 과는 거꾸로인거죠.
즉, 다수의 Inbound 채널로부터 input 데이터를 받고, 이를 가공하여 하나의 Outbound 채널로 내보내는 것.
물론 이 때도 함수는 Inbound 채널들이 모두 닫힐 때까지 읽어들이게 될거예요.

이전과 살짝 달라진 main 함수 코드를 한번 보시죠.

func main() {
    in := generate(2, 3)

    c1 := square(in)
    c2 := square(in)

    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

머리속에 그림이 좀 그려지시나요?
generate 에서 리턴한 채널(in) 하나에게 2 개의 square 함수가 빨대를 꽂고 있네요.
이게 바로 fan-out 인거죠.
그리고는, 각자 채널을 하나씩 리턴합니다.

위에 갑자기 등장한 merge 함수는 어떻게 생겨먹었는지 좀 봅시다.
square 함수들에서 리턴한 채널 2 개를 받아서 뭘 하고 있는지...

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(cs))
    for _, c := range cs {
        go func(c <-chan int) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

뭔가 좀 복잡해 보이는데 사실 눈을 크게 뜨고 구조를 파악해보면 아주 간단한 코드입니다.
merge 는 여러 개의 Inbound 채널에 대해 전담 고루틴을 생성하여 데이터를 읽고, 이를 하나의 out 채널로 전송해주는 일 밖에 하는게 없네요.
바로 위에서 살펴본 fan-in 의 역할입니다.

다만, 이런 방식으로 채널을 사용할 때 아주 주의해야할 점이 있는데요.

  1. 다수의 고루틴이 동일한 Outbound 채널에 경쟁적으로 데이터를 전송하고 있는 상황에서, 누가 언제 그 Outbound 채널을 책임지고 close 해줘야 하나?
  2. 다수의 고루틴이 끝나는걸 어떤 방식으로 기다려줘야 하나?

첫번째 주의점은 어떻게 해결해야 하는지 살펴보죠.

close 된 채널에 데이터를 전송하면 panic 이 발생하는건 이미 알고 계시죠.
그래서, 아무도 데이터를 전송하지 않는다는 확신을 가질 수 있을 때, sender 쪽에서 채널을 close 해주어야 합니다.

위의 merge 함수 코드에서 아래 부분 처럼요.
고루틴이 모두 끝나기를 기다렸다가 out 채널을 close 하고 있잖아요.

    go func() {
        wg.Wait()
        close(out)
    }()

그런데, 왜 기다리는 일 또한 별도 고루틴을 통해서 수행할까요?
30 초만 생각해보세요...
아래의 wg.Wait() 처럼 별도 고루틴 생성없이 기다려주면 되는거 아닌가요?

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(cs))
    for _, c := range cs {
        go func(c <-chan int) {
            for n := range c {
                out <- n     <-- 이 데이터는 누가 받아주냐...
            }
            wg.Done()
        }(c)
    }

    wg.Wait()  <-- 이렇게 기다리면?
    close(out)

    return out  <-- 여기는 언제 도착하냐...
}

만약 별도 고루틴을 통해 기다리지 않고 main 고루틴에서 wg.Wait() 를 수행한다면, main 고루틴은 여기에서 다른 고루틴들이 모두 끝날 때까지 기다리게 될거예요.
그런데, 여기서 마냥 기다리고 있으면 고루틴들이 out 채널로 보내는 데이터는 누가 받아주나요?
받아주는 놈은 눈을 씻고 찾아봐도 없는데...
채널은 받아주는 놈이 없으면 보내는 놈도 넋놓고 기다리게 됩니다.
결국 main 고루틴도... 작업을 수행하는 별도의 고루틴들도... 모두 일손을 놓아버려 deadlock 상태에 빠지고 맙니다.

merge 함수에서 빨리 out 채널을 리턴해줘야 다른 Stage 에서 이 채널을 이용해서 데이터를 빨아들여주죠.

Buffered 채널

자~ 위의 코드는 이제 이해가 됩니다.
웬지 문제없이 잘 돌아갈 것 같아요.

특정 상황에서는...

특정 상황에서는 ???
뭔가 문제가 있다는 말이네요.

무슨 문제인지 봅시다.

func main() {
...
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
}

main 함수가 조금 달라졌어요.
위의 코드는 파이프라인에서 마지막 Stage 에 해당하는 부분인데요.

이전 코드에서는 merge 함수가 리턴하는 out 채널로부터 인입되는 모든 데이터를, 이곳 마지막 Stage 에서 모두 빨아들였는데요.
지금은 딸랑 하나만 읽고 바로 리턴을 해버리네요...;;

그럼, main 함수의 최초 Stage 에서 2 개의 숫자를 Produce 했는데, 파이프라인의 최종 Consumer 가 한개만 빼가고 하나는 모른 척 나가버렸다는 말이잖아요.

이렇게 되면 중간 Stage 인 merge 함수안에서 생성했던 고루틴들이 out 채널로 아직 못보낸 데이터가 남아 있는 상태일텐데... 얘네들은 손가락만 빨고 기다리고 있었을텐데...;;

방법 없어요.

이런 경우는 가차없이 리소스의 누수(leak)가 생기는겁니다.
여러분이 잘못한거예요.
고루틴도 메모리를 사용하고 runtime 에서 사용하는 다양한 리소스와 향후 garbage 수집이 될 것들을 많이 가지고 있을텐데, 이렇게 누군가 신경써주지 않으면 그냥 "버리고 떠난 엄마를 마냥 기다리는 아이"처럼 고아가 되어버리는거죠. ㅠㅜ

그래서, 이런 경우에는 buffered 채널을 사용하기도 해요.
받아주는 놈이 아직 없어도 보내는 놈은 그냥 Buffer 에 던져넣어주고 "난 몰라. 받아갈려면 받아가고 말려면 말고..." 하는거죠.
넋놓고 기다리는 일은 하지 않겠다는거예요.

generate 함수를 예로 들면 다음과 같이 하면 되는겁니다.

func generate(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

채널을 만들 때 아예 nums 인자의 갯수만큼 받아줄 수 있는 Buffered 채널을 만들었잖아요.
설사 out 채널에서 읽어들여주는 놈이 없어도 Buffer 에 자기가 전달하고 싶은 데이터들 확 집어넣어버리고 채널도 자기는 이제 안쓸거니깐 닫아버립니다.

이 때는 별도로 고루틴을 생성해서 out 채널에 쓸 필요도 없네요.
받아주는 놈 없으면 보내는 쪽도 기다려야하는데, 중간에 채널 Buffer 를 끼워넣었으니 비동기적으로 처리할 필요가 없는거죠.
이 예제에서는 어차피 채널로 보낼 데이터 갯수를 이미 알고 있으니까, 그 갯수를 받아줄 Buffer 크기만 충분하면 이렇게 처리해도 문제가 없는거예요.

위쪽에서 마지막 Consumer 가 데이터 하나를 안빼가서 기다리고 있던 merge 내의 고루틴도 아래처럼 Buffer 하나를 두면 기다리지 않게 될겁니다.

좋네요~ 문제는 해결된 것 같아요.

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1)
...

이제 모두 깔끔히 해결한건가??
그런데, 뭐지?
이 몰려드는 찜찜함은...;;

Cancellation

위와 같은 방식으로는 근본적으로 문제를 해결할 수가 없어요.
Buffer 크기를 얼마로 정해야 Sender 의 hang 을 모든 경우에 막을 수가 있나요?
보내는 쪽에서 얼마나 많은 데이터를 보낼지 전혀 모르는 상황에서 말이죠.

Buffer 크기는 1 로 만들어놨는데 Sender 가 보낼 데이터가 아직 많이 남아있다면 넋놓고 기다리는 상황은 마찬가지 아닌가요?
Buffer 가 다 차면 어쩔건데요?
우리가 보고 있는 코드에서는 비록 문제없을지 모르지만, 이런 식의 접근은 위험해 보입니다.

결국 데이터를 Receiver 쪽에서 모조리 받아주거나...
아니면 모조리 Buffer 에 집어넣을 수 있거나...
이래야 하는데... ㅠㅜ

이것보다 나은 해결책이... 당연히 있겠죠?

근본적인 해결책은 바로 "Upstream Stage 쪽에서 데이터를 보내려고 대기하는 고루틴들에게 취소(cancel) 시그널을 보낸다."는 겁니다.
cancel 시그널을 보내기 위한 채널을 하나 별도로 만들어서, 이를 Sender 와 Receiver 사이에 연결해주는거죠.
데이터 스트림과는 반대 방향의 시그널이 되겠네요.

이런 방법을 사용하면 Receiver 가 더이상 데이터를 받을 수 없는 상황일 때, 거꾸로 Sender 에게 "이제 데이터 그만 보내고 끝내라."라고 알려줄 수 있는 것이죠.

위쪽 main 함수를 살짝 바꾸어본 아래 코드를 보세요.

func main() {
    in := generate(2, 3)

    c1 := square(in)
    c2 := square(in)

    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    done <- struct{}{}
    done <- struct{}{}
}

파이프라인의 마지막 Stage 에서 out 채널을 통해 데이터를 하나만 받은 후, done 채널을 이용해서 빈 깡통 데이터(struct{}{})를 보내고 있습니다.
이게 바로 시그널을 보내는 작업이예요.
bool 이나 int 같은 데이터를 보내도 상관없습니다만, 괜히 불필요하게 리소스를 사용할 필요는 없잖아요?

바로 이전 Stage 인 merge 함수에도 done 채널을 인자로 전달했으니, 그 안의 고루틴들은 done 채널을 모니터링하고 있어야겠죠.
(뒤에서 코드로 살펴볼거예요)

근데, 왜 done 채널로 시그널을 두 번씩이나 보낼까요?

그것은 이전 Stage 에서 최대 2 개의 고루틴이 대기할 수 있다는 사실을 이미 알고 코드를 작성한 것이라서 그래요.
별로 좋지 않은 코드죠.
이전 Stage 에서 얼마나 많은 고루틴들이 대기 상태가 될지 어떻게 알겠어요?
이 예제에서는 어쨌건 2 개의 데이터를 2 개의 고루틴이 처리한다는 사실을 알고 있기 때문에 이렇게 작성할 수 있는거죠.

어쨌건 그러면 merge 함수안에서 done 채널로 시그널을 받는 놈들은 어떻게 처리하는지 봅시다.

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(cs))
    for _, c := range cs {
        go func(c <-chan int) {
            for n := range c {
                select {
                case out <- n:
                case <- done:
                }
            }
            wg.Done()
        }(c)
    }
    ...
}

이전 코드와 달라진 부분이라면 done 채널에서의 시그널도 받을 수 있도록, select 구문으로 변경했다는거예요.
위의 코드는 done 채널에서 시그널을 받더라도 range 구문을 통해 Inbound 채널(c)로부터는 데이터를 계속 읽어들이고 있어요.
구현 나름이긴 한데 굳이 이렇게 cancel 시그널을 받은 상황에서 input 데이터를 계속 받고 있는건 좀 아닌 것 같네요.
이 부분도 뒤에서 코드를 고쳐볼께요.

위에서 살펴본 main 함수에서 시그널을 두번 보내는 방식에 좀 문제가 있다고 말씀드렸죠?
앞쪽 Stage 에서 얼마나 많은 Sender 들이 대기하고 있을지 모르기 때문에, cancel 시그널을 몇 번 보내야할지를 사실 알기가 힘든 경우가 있잖아요.

    done <- struct{}{}
    done <- struct{}{}

그러면 이럴 때는 어떻게 해야할까...

Network 에서 동일망에 있는 target 들에게 broadcast 메시지를 전송하는 방식이 있죠.
여기에서도 이러한 방식으로 접근해볼 수 있습니다.

"기다리는 모든 놈들에게 시그널을 다 뿌린다..."

그냥 done 채널을 보고 있는 놈들에게는 시그널을 broadcast 해버리는거죠.
cancel 시그널을 받아서 멈추고 싶은 놈들은 모두 done 채널을 챙겨 보고 있으면 되는 것 아닌가요?

채널에서 시그널을 broadcast 한다는건 채널을 close 한다는 의미입니다.
done 채널을 close 해버리면 해당 채널에서 뭔가 기다리는 놈들은 모두가 시그널을 받게 됩니다.

아래 코드를 보세요.
done 채널을 만들자마자 defer 구문을 통해 done 채널을 close 해버리죠.
이는 main 함수가 return 할 때 close 를 호출한다는거잖아요?
즉, main 함수가 끝날 때 done 채널을 보고 있는 놈들에게 broadcast 를 보내버리는 것이죠.

"야!! 너희들 모두 하던 일 멈추고 끝내 !"

아래 코드를 보면 generate, square, merge 함수 모두에 done 채널을 전달해줬어요.
그리고는, 데이터를 딸랑 하나만 읽어들이고는 done 채널을 닫아버리죠. (main 끝날 때)

func main() {
    done := make(chan struct{})
    defer close(done)

    in := generate(done, 2, 3)

    c1 := square(done, in)
    c2 := square(done, in)

    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // defer 때문에 done 채널은 자동으로 close 됨
}

그러면, merge 와 square 에서 생성된 고루틴들은 아래와 같이 done 채널에서 시그널을 받아서 return 해버리면 그만입니다.
다만 merge 함수에서는 왜 defer wg.Done() 호출이 필요한지는 아시겠죠?
고루틴들이 모두 종료되는 것을 wg.Wait() 를 통해 기다렸다가 out 채널을 닫아야 하니까요.
위쪽에서 fan-in 을 설명할 때 말씀드린 것처럼요.

이렇게 처리하면 cancel 시그널을 받았을 때 고루틴이 즉각 리턴하게 되니까, Inbound 채널로부터 계속 데이터를 받아들이는 이상한 상황도 없어지겠네요.
위쪽에서 말씀드렸었죠?

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    wg.Add(len(cs))
    for _, c := range cs {
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                select {
                case out <- n:
                case <- done:
                    return
                }
            }
        }(c)
    }
    ...
}

square 함수에서는 done 채널로부터 시그널을 받으면 out 채널을 고루틴이 직접 close 해버리네요.
square 함수는 fan-in 이나 fan-out 구조가 아니고 그냥 Inbound 채널로부터 input 을 받아서 작업 결과를 Outbound 채널로 넘겨주는 놈이라서 그래요.
input : output 이 1 : 1 이라서 자신이 직접 out 채널은 닫아버리면 되는겁니다.

func square(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

지금까지 Go 를 이용한 데이터 파이프라이닝에 대해 설명을 길게 드렸습니다.
하지만, 곳곳에 중요한 내용들이 숨어 있어요.
일정한 데이터 처리 흐름을 유지하되 중간 Stage 에서 고루틴을 이용한 병렬 처리를 곁들이는 방식으로 유용하게 사용할 수도 있겠죠?
그래서, 데이터 파이프라인과 Concurrency 는 서로 함께 할 때 강력합니다.
설계할 때 요놈들을 잘 이용하면 코드도 상당히 깔끔하게 작성할 수 있습니다.

사실 인터페이스를 사용하면 파이프라이닝 코드를 위보다 더 가독성있게 작성할 수 있습니다.
특정 인터페이스만 구현하면 파이프라인에 내 맘대로 원하는 Stage 를 끼워넣기가 훨씬 쉬워요.
이에 대한 글은 곧 준비해보려고 합니다.
main 의 모양이 아래와 같으면 더 보기 좋고 기능 확장하기도 좋지 않을까요?
사실 채널은 겉으로는 보이지도 않게 되죠.

func main() {
    pipeline := BuildPipeline(FirstStage{}, SecondStage{}, ThirdStage{})
    go func() {
        for job := range GetJobs() {
            pipiline.Input(*job)
        }
        pipeline.Close()
    }()

    for result := range pipeline.Output() {
        log.Println(result)
    }
}

끝으로 Go Concurrency Pattern 관련 유용한 영상 2 개 투척합니다.


이 글에서 사용한 source code 는 https://blog.golang.org/pipelines 에 있는 example 을 살짝살짝 변형하여 사용했습니다.
설명은 제 방식대로 했구요.

You may also like...

5 2 votes
Article Rating
Subscribe
Notify of
guest
1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
In

잘 봤습니다~ ^^

1
0
Would love your thoughts, please comment.x
()
x