이 글에서는 Peer-Assisted Delivery Network(PDN) 시스템의 리팩토링 과정을 공유하고, 프로젝트의 주요 도전 과제와 이를 해결하기 위해 도입한 접근 방식을 단계적으로 설명하고자 합니다.
프로젝트 소개
저희 프로젝트는 WebRTC 기반의 1:N 스트리밍 서버로, 실시간 비디오 스트리밍을 지원합니다. 이 서버는 Peer-Assisted CDN 아키텍처를 활용하여 네트워크 효율성을 최적화하고 서버 부하를 줄이는 것을 목표로 하고 있습니다.
Peer-Assisted CDN은 클라이언트가 단순히 데이터를 소비하는 역할을 넘어 다른 클라이언트에게 데이터를 전달하는 역할도 수행하는 방식입니다. 이를 통해 데이터를 모든 클라이언트에게 서버가 직접 전송하는 기존 방식과 달리, 클라이언트 간 P2P(Peer-to-Peer) 연결을 활용해 서버의 네트워크 부하를 줄이고 전송 속도를 개선할 수 있습니다.
예를 들어, 특정 클라이언트가 서버로부터 스트리밍 데이터를 받은 후, 이 데이터를 동일한 스트림을 요청한 다른 클라이언트에게 전달하는 형태로 작동합니다.
저희 프로젝트는 고질적인 문제가 있었습니다.
바로 다른 클라이언트에게 데이터를 전달하는 역할을 맡는 Peer, 즉 forwarder를 어떤 기준으로 선택할 것인지, 그리고 그 과정을 얼마나 효율적으로 구현할 것인지였습니다.
Forwarder 선정 조건
저희 시스템에서 forwarder로 동작하기 위해서는 몇 가지 중요한 조건을 충족해야 합니다.
- WebRTC 연결 성공
Forwarder가 되기 위해서는 반드시 WebRTC를 통해 다른 클라이언트와의 연결이 성공적으로 이루어져야 합니다. 연결 실패 시 해당 클라이언트는 forwarder로 동작할 수 없습니다.
- 서버에서 미디어 데이터를 수신 및 전달 가능
Forwarder는 서버로부터 스트리밍 데이터를 수신하고, 이를 요청한 다른 클라이언트에게 문제없이 전달할 수 있어야 합니다. 이를 통해 클라이언트 간 데이터 전송이 원활히 이루어집니다.
또한, 하나의 forwarder가 여러 클라이언트와 P2P 연결을 맺을 경우, 일정 연결 수 이상이 되면 부하가 크게 증가하는 문제가 발생할 수 있습니다. 예를 들어, P2P 연결이 3개 이상일 경우 클라이언트는 네트워크 대역폭 소모와 시스템 리소스 사용량이 급격히 증가할 수 있습니다. 이를 방지하기 위해, 시스템은 기존 forwarder 대신 새로운 forwarder를 동적으로 선택하여아 합니다.
그래서 저희는 forwarder로 지정, 선택하는 방법을 다음과 같이 고안하였습니다.
Forwarder 지정 및 선택 방법
Forwarder 지정
저희는 forwarder를 지정하기 위해 Classifier라는 컴포넌트를 도입했습니다. Classifier는 다음과 같은 방식으로 forwarder를 판별합니다:
Classifier는 WebRTC 연결 실패한 Peer와 후보군 Peer 간의 ICE(Interactive Connectivity Establishment) 프로토콜을 활용하여 연결 테스트를 수행합니다.
성공 시, Forwarder로 지정되며, 실패 시 후보군에서 제외됩니다.
- WebRTC 연결 성공:
연결이 성공하면 해당 peer는 forwarder가 될 수 있는 자격을 갖춥니다.
- WebRTC 연결 실패:
연결이 실패한 peer는 forwarder로 지정되지 않습니다. 대신 이후, Forwarder 후보군 중에서도 잠재적으로 Forwarder가 될 가능성이 있는 peer를 판별하는데 이용됩니다.
Forwarder 선택
forwarder를 선택할 때는 스트림을 내려주고 있는 수(connectionCount)와 생성 시간(createdTime)을 고려하여 점수를 계산합니다.
스트림을 내려주는 수(connectionCount):
스트림을 내려주는 연결 수가 적을수록 더 높은 점수를 부여합니다. 이는 적은 부하를 가진 peer가 더 안정적으로 작동할 가능성이 높기 때문입니다.
생성 시간(createdTime):
오래된 peer일수록 연결이 유지될 가능성이 적다고 가정하여, 더 높은 점수를 부여합니다.
이 두 기준을 기반으로 가중치를 계산하여 forwarder를 선택하는 알고리즘을 구현하였습니다.
관련 Pull Request는 아래에서 확인할 수 있습니다:
https://github.com/Gopring/PDN-WebRTC/pull/65
Implement Classifier and Base Forwarder Selection Algorithm to Resolve N+1 Query Problem by jhmin99 · Pull Request #65 · Gopri
Description This pull request introduces the Classifier and lays the foundation for the Forwarder selection algorithm, addressing the N+1 query problem and enhancing the efficiency and scalability...
github.com
문제 정의
저희 시스템은 시간이 지날수록 구조가 복잡해지면서 유지보수가 점점 어려워지고 있었습니다. 특히, 실시간성이 중요한 스트리밍 서비스에서 여러 구성 요소 간의 동기화를 수시로 처리해야 하는 로직들이 많아지고, 이러한 복잡도가 서비스 안정성과 확장성에 악영향을 미칠 가능성이 커졌습니다. 이러한 이유로, 시스템의 간소화와 효율화를 목표로 리팩토링이 필요함을 절실히 느꼈습니다.
저희는 초기 시스템에서 아래와 같은 구조를 사용하고 있었습니다:
- ChannelInfo: 채널 정보를 관리하는 데이터 구조
- ClientInfo: 클라이언트 정보를 관리하는 데이터 구조
- ConnectionInfo: WebRTC 연결 정보를 관리하는 데이터 구조
// ChannelInfo is a struct for channel information.
type ChannelInfo struct {
ID string
Key string
CreatedAt time.Time
}
// ClientInfo is a struct for client information.
type ClientInfo struct {
ID string
ChannelID string
Class int
ConnectionCount int
FetchFrom string
CreatedAt time.Time
}
// ConnectionInfo is a struct for WebRTC connection information.
type ConnectionInfo struct {
ID string
ChannelID string
To string
From string
Type int
Status int
CreatedAt time.Time
ConnectedAt time.Time
}
저희 시스템은 WebRTC 연결과 관련된 데이터를 효율적으로 관리하면서, 성공적인 P2P 연결을 위한 forwarder를 적절히 선택하는 데 초점을 맞췄습니다. 그러나 WebRTC 연결 데이터를 관리하는 과정에서, ClientInfo와 ConnectionInfo 간에 역할이 명확하지 않고 중복된 데이터가 존재하여 관리 부담이 증가했습니다. 이로 인해 코드의 복잡성이 높아지고 유지보수 비용이 증가하는 문제가 발생했습니다.
저희의 최종 목표는 다음과 같았습니다:
- 성공적인 P2P 연결을 위한 forwarder를 잘 선택
- ClientInfo를 단순화하여 관리 부담을 줄이는 것
리팩토링 과정에서 기존 기능을 유지하며 새로운 구조를 설계하기 위해 여러 방안을 고민해보았습니다.
1. Classify를 위한 작업 큐
Classify 작업이 동시에 진행되지 않도록 작업 큐를 만들어, 동일한 peer에 대한 동시 작업을 방지하려 했습니다.
그러나 작업 큐에 들어간 peer가 중간에 연결이 끊기는 경우 작업이 중단되며, 이를 관리하기 위해 복잡한 예외 처리 로직이 추가로 필요했습니다. 또한, 작업 큐와 데이터베이스 업데이트를 동시에 관리해야 하므로 시스템이 더 복잡해질 것으로 보였습니다.
2. Class에 따른 Pool 관리
Forwarder, Fetcher, Candidate라는 세 가지 역할을 기반으로 peer를 동적으로 관리하는 pool을 설계했습니다.
Fetcher Pool: P2P 연결에 실패한 peer를 관리.
Candidate Pool: Forwarder 후보로 고려될 수 있는 peer를 관리.
Forwarder Pool: Forwarder로 선정된 peer를 관리.
Class에 따라 pool에서 peer를 꺼내 작업하므로 역할 분리가 명확하고, classify와 forwarding 작업을 효과적으로 나눌 수 있는 장점이 있었습니다. 그러나 Pool 상태를 실시간으로 동기화해야 하고, Forwarder를 선택하는 추가 알고리즘 구현으로 인해 기존 구조와 복잡성 차이가 크지 않았습니다.
이러한 고민 끝에 저희는 “Class라는 개념이 존재하는 한 단순한 설계가 어렵다”는 결론에 도달했습니다. 시스템의 복잡성을 줄이고 효율적인 forwarder 관리를 위해 Sorted Set을 활용한 새로운 리팩토링 방안을 도입하게 되었습니다.
Sorted Set 도입의 배경
저희는 Forwarder 관리를 위한 기존 구조의 복잡성을 해소하기 위해 Redis의 Sorted Set에서 영감을 받아, 새로운 데이터 관리 방식을 도입했습니다. 이 방식은 Forwarder 점수 계산, 동적 업데이트, 효율적인 탐색을 가능하게 하여 기존의 문제를 해결했습니다.
Sorted Set이란?
Sorted Set은 고유한 키(key)와 점수(score)로 구성된 데이터 구조로, 데이터가 점수에 따라 정렬되어 저장됩니다.
이 데이터 구조는 다음과 같은 장점이 있어, forwarder 관리에 적합하다고 판단했습니다:
- 효율적인 정렬 및 검색:
점수 기반 정렬로 forwarder를 선택하는 데 필요한 데이터를 O(log N) 시간 복잡도로 추가하거나 업데이트할 수 있습니다.
또한 가장 높은 점수를 가진 Peer(Forwarder)를 추출하는 작업은 O(1)로 수행 가능하여, forwarder 선택에 드는 시간을 크게 줄일 수 있습니다.
- 동적 업데이트:
Forwarder의 connectionCount나 createdAt에 따라 점수를 실시간으로 업데이트할 수 있습니다.
추후 CPU 사용률, 패킷 손실률과 같은 추가 메트릭을 Forwarder 선정 알고리즘에 반영하려는 경우에도, 간단히 점수를 수정하여 손쉽게 확장할 수 있습니다.
저희 시스템에서는 Forwarder 후보군을 Sorted Set에 저장하고, 점수를 기반으로 Forwarder를 효율적으로 선택합니다. 아래 코드는 Sorted Set을 초기화하고, Peer A와 Peer B를 Forwarder 후보로 추가한 뒤, 가장 높은 점수를 가진 Forwarder를 선택하는 예제입니다.
set := sortedset.New()
set.AddOrUpdate("PeerA", 101, nil)
set.AddOrUpdate("PeerB", 102, nil)
topPeer := set.PeekMax()
fmt.Println("Top Forwarder:", topPeer)
위 코드의 실행 결과는 Top Forwarder로 가장 높은 점수를 가진 Peer B를 반환합니다.
저희는 프로젝트에서 Sorted Set 기능을 효율적으로 구현하기 위해 github.com/wangjia184/sortedset 패키지를 활용하였습니다.
이 패키지는 Sorted Set의 기본적인 동작(추가, 삭제, 업데이트, 검색)을 지원하며, 효율적인 데이터 관리와 Forwarder 선택 알고리즘을 구현하는 데 적합합니다. 아래는 해당 패키지의 공식 문서를 참고할 수 있는 링크입니다:
https://pkg.go.dev/github.com/wangjia184/sortedset#section-readme
sortedset package - github.com/wangjia184/sortedset - Go Packages
Discover Packages github.com/wangjia184/sortedset Version: v0.0.0-...-af6d6d2 Opens a new window with list of versions in this module. Published: Feb 9, 2022 License: BSD-2-Clause Opens a new window with license information. Imports: 1 Opens a new window w
pkg.go.dev
Sorted Set 도입 후 설계 방향
Forwarder 점수 계산
점수는 64비트 정수로 표현되며, 다음과 같은 규칙에 따라 비트를 배정합니다:
상위 3비트(Connection Count):
현재 스트림을 내려주는 연결 수가 적을수록 점수가 높아집니다.
forwarder의 부하를 균등하게 분산하기 위해 가장 중요한 지표로 설정되었습니다.
Connection Count = 0: 111
Connection Count = 1: 110
Connection Count = 2: 101
...
중간 32비트(CreatedAt):
오래된 Peer일수록 더 높은 점수가 부여됩니다.
안정성이 높은 Peer를 우선적으로 forwarder로 지정하여 P2P 연결의 신뢰성을 확보할 수 있습니다.
Created Time은 시스템에서 Peer가 얼마나 오랫동안 활동했는지를 나타내는 중요한 척도로 활용됩니다.
하위 29비트 (Addition Metrics):
현재 시스템에서는 Connection Count와 CreatedAt을 기반으로 Forwarder 점수를 계산하고 있습니다.
하지만 시스템 확장성과 다양한 상황에 대응하기 위해 CPU 사용률, 네트워크 패킷 손실률 등의 추가 지표를 점수 계산에 반영할 계획입니다. 이러한 지표들은 하위 29비트에 할당되어, 각 지표에 가중치를 부여하는 방식으로 Forwarder 점수 계산에 활용될 예정입니다.
시나리오를 통한 동작 방식
새로운 Peer A가 들어온 경우
새로운 Peer가 시스템에 들어오면 서버와의 초기 연결부터 P2P 연결 시도까지의 과정을 거칩니다.
1. Peer A는 서버로 부터 스트림을 받습니다:
새로운 Peer는 기본적으로 서버와의 연결을 통해 미디어 스트림을 수신합니다.
2. Pool에서 Forwarder 역할을 수행할 수 있는 Peer B를 찾습니다:
Forwarder 후보로 Pool(Sorted Set)에서 가장 점수가 높은 Peer를 뽑습니다.
2.1. B가 존재한다면:
B와 A를 P2P 연결을 시도합니다.
2.1.1. P2P 연결이 성공하면:
A는 서버와의 스트림 연결을 종료하고 B로부터 스트림을 수신합니다.
2.1.2. P2P 연결이 실패하면:
B를 Pool에서 제거하고, Forwarder 후보로 다시 고려되지 않습니다.
A는 Pool에서 다른 Forwarder를 찾아 연결을 시도합니다.
2.2. B가 존재하지 않는다면:
A를 Pool에 추가하여 Forwarder 후보로 등록합니다.
위 과정을 시각적으로 정리한 다이어그램은 다음과 같습니다:
중간에 Peer A가 나가는 경우
이번에는 시스템 동작 중 특정 Peer가 나가거나 상태가 변경될 때 발생할 수 있는 시나리오를 살펴보겠습니다.
1. Peer A가 시스템에서 나갑니다:
A는 P2P 연결에서 나가거나, 시스템에서 완전히 제거됩니다.
이로 인해 B와 C는 더 이상 A로부터 미디어 스트림을 받을 수 없게 됩니다.
B와 C는 서버로부터 미디어 스트림을 수신하도록 전환됩니다.
2. Pool에서 Forwarder 역할을 수행할 수 있는 Peer를 찾습니다:
B와 C는 각각 Pool(Sorted Set)에서 Forwarder 후보를 검색합니다.
Forwarder 후보는 Pool에서 가장 점수가 높은 Peer로 선택됩니다.
2.1. Peer가 존재한다면
B와 C는 각각 선택한 Forwarder 후보와 독립적으로 P2P 연결을 시도합니다.
2.1.1. P2P 연결이 성공하면:
해당 Peer는 Forwarder로부터 스트림을 수신하며, 서버 스트림 연결을 종료합니다.
2.1.2. P2P 연결이 실패하면:
실패한 Forwarder 후보는 Pool에서 제거되며, 다시 Forwarder로 고려되지 않습니다.
Pool에서 다음 Forwarder 후보를 찾아 연결을 다시 시도합니다.
2.2. Peer가 존재하지 않는다면
Pool에서 Forwarder 후보를 찾지 못한 B 혹은 C는 Pool에 추가되어 새로운 Forwarder 후보로 등록됩니다.
위 과정을 시각적으로 정리한 다이어그램은 다음과 같습니다:
다만, P2P 연결 과정에서 Pool의 상태를 동기화하지 않으면, 동시성 문제로 인해 "서로를 Forwarder로 선택"하는 상황이 발생할 수 있습니다.
이와 같은 상황에서 B와 C는 서로에게 Stream Forwarding을 요청하고, 각각 P2P 연결이 성공한 상태로 서버와의 스트림 연결을 종료하게 됩니다. 그러나, B와 C 모두 스트림을 수신하지 않으면서 스트림을 내려주고 있는 상태에 빠지게 됩니다.
이 문제를 해결하기 위해 Lock을 활용하여 동시성을 제어하는 방식을 적용했습니다. 이를 통해 Pool 상태를 안전하게 관리하고, Forwarder 선정과 P2P 연결 과정에서 발생할 수 있는 충돌을 방지할 수 있었습니다.
전체 코드는 다음과 같습니다.
// Package pool manages the sorted set of forwarder candidates and their scores.
package pool
import (
"github.com/wangjia184/sortedset"
"pdn/database"
"sync"
"time"
)
// Constants defining the bit allocation for score calculation.
const (
ConnectionCountBits = 61
CreatedAtBits = 29
)
// channelSet manages a single channel's sorted set and its lock
type channelSet struct {
mutex sync.RWMutex
set *sortedset.SortedSet
}
// Pool manages the sorted sets of forwarder candidates for each channel
type Pool struct {
globalMutex sync.RWMutex
sets map[string]*channelSet
database database.Database
}
// New initializes a new Pool with a database reference
func New(db database.Database) *Pool {
return &Pool{
sets: make(map[string]*channelSet),
database: db,
}
}
// getOrCreateSet ensures a channelSet exists for the given channelID
func (p *Pool) getOrCreateSet(channelID string) *channelSet {
p.globalMutex.RLock()
if cs, exists := p.sets[channelID]; exists {
p.globalMutex.RUnlock()
return cs
}
p.globalMutex.RUnlock()
p.globalMutex.Lock()
defer p.globalMutex.Unlock()
cs, exists := p.sets[channelID]
if !exists {
cs = &channelSet{
set: sortedset.New(),
}
p.sets[channelID] = cs
return cs
}
return cs
}
// calculateScore calculates the score based on connection count and created time
func calculateScore(connectionCount int64, createdAt time.Time) int64 {
elapsedSeconds := int64(time.Since(createdAt).Seconds())
return (connectionCount << ConnectionCountBits) | (elapsedSeconds << CreatedAtBits)
}
// getConnectionCount retrieves the connection count for a client ID from the database
func (p *Pool) getConnectionCount(clientID, channelID string) (int64, error) {
connections, err := p.database.FindAllPeerConnectionInfoByFrom(channelID, clientID)
if err != nil {
return 0, err
}
return int64(len(connections)), nil
}
// AddClient adds a new ClientInfo to the pool for a specific channel
func (p *Pool) AddClient(client database.ClientInfo) error {
cs := p.getOrCreateSet(client.ChannelID)
cs.mutex.Lock()
defer cs.mutex.Unlock()
connectionCount, err := p.getConnectionCount(client.ID, client.ChannelID)
if err != nil {
return err
}
score := calculateScore(connectionCount, client.CreatedAt)
cs.set.AddOrUpdate(client.ID, sortedset.SCORE(score), client)
return nil
}
// UpdateClientScore recalculates the score for a specific client in a channel
func (p *Pool) UpdateClientScore(clientID, channelID string, maxForwardingNum int) error {
cs := p.getOrCreateSet(channelID)
cs.mutex.Lock()
defer cs.mutex.Unlock()
node := cs.set.GetByKey(clientID)
if node == nil {
clientInfo, err := p.database.FindClientInfoByID(channelID, clientID)
if err != nil {
return err
}
if err := p.AddClient(*clientInfo); err != nil {
return err
}
return nil
}
client := node.Value.(database.ClientInfo)
connectionCount, err := p.getConnectionCount(clientID, channelID)
if err != nil {
return err
}
if connectionCount >= int64(maxForwardingNum) {
cs.set.Remove(client.ID)
return nil
}
newScore := calculateScore(connectionCount, client.CreatedAt)
cs.set.AddOrUpdate(client.ID, sortedset.SCORE(newScore), client)
return nil
}
// GetTopForwarder retrieves the highest scored forwarder for a specific channel
func (p *Pool) GetTopForwarder(channelID string) *database.ClientInfo {
cs := p.getOrCreateSet(channelID)
cs.mutex.RLock()
defer cs.mutex.RUnlock()
topNode := cs.set.PeekMax()
if topNode == nil {
return nil
}
client := topNode.Value.(database.ClientInfo)
return &client
}
// RemoveClient removes a client from the pool for a specific channel
func (p *Pool) RemoveClient(clientID, channelID string) {
cs := p.getOrCreateSet(channelID)
cs.mutex.Lock()
defer cs.mutex.Unlock()
cs.set.Remove(clientID)
}
저희 프로젝트 레포는 아래에서 확인해보실 수 있습니다.
https://github.com/Gopring/PDN-WebRTC
GitHub - Gopring/PDN-WebRTC: A WebRTC-based 1:N streaming server enabling real-time video streaming and chat with a peer-assiste
A WebRTC-based 1:N streaming server enabling real-time video streaming and chat with a peer-assisted CDN architecture for efficient networking and reduced server load. - Gopring/PDN-WebRTC
github.com