본문 바로가기

자바스크립트

React) mediasoup을 이용한 SFU

작년 WebRTCPeerConnection을 이용한 P2P 토폴로지 방식의 화상채팅을 구현했었다.

동작방식과 각각의 파라미터를 이해하는데 별 어려움이 없었고 올해, 내가 재직했었던 회사의 정부과제 프로젝트에서도 P2P방식의 화상채팅 기능을 사용했었다.

그러나 P2P방식의 문제점은 사용자 수가 늘어갈 수록 한 사람당 감당해야 할 peerConnection의 수가 기하급수적으로 증가한다는 것이었고,시험인증에서 응답속도 부문은 탈락하게 되었다.

 

때문에 개인 프로젝트로 P2P방식이 아닌 SFU방식으로 webRTC를 구현해보고자 한다.

 

Libraries

SFU를 구현하게 도와줄 라이브러리는 많다. MedoozeKurento 등이 있지만 mediasoup을 선택한 이유는 간단하다.

mediasoup의 공식문서가 가장 깔끔하게 정리되어있었고, 가장 최근까지 꾸준한 업데이트를 진행했었다. 또한 mediasoup의 디자인을 사진으로 한눈에 알아볼 수 있었고 mediasoup을 이용한 여러 예제들도 있었다

 

mediasoup Architecture

 

So, How do i use?

아무래도 "How do i use"정도로 간단하게 설명할 수 없는 라이브러리이다.

 

P2P토폴로지와 비교하면 상당히 복잡한 방식이다.

우선 동작 방식에 대해 이야기 하기 전, 중요한 용어를 설명하겠다.

 

  • Device
    • Device는 미디어를 보내거나 받기 위해 mediasoup 라우터에 연결하는 엔드포인트이다. 즉, 클라이언트가 서버의 라우터와의 연결점이다.
    • Devie를 생성한 후 .load() 메서드로 디바이스를 load해야하는데, 이때 필요한 파라미터로는 rtcCapabilites가 있다.
    • routerRtpCapabilites는 라우터에서 활성화된 오디오/비디오 코덱을 정의한다.
     

  • Transport
    • Transport는 두 장치간의 연결을 위해 생성되는 일종의 "통로"이다.
    • mediasoup의 Transport는 SendTransport(보내기)와 RecvTransport(받기), 두 가지가 있다.
     

 

  • Router
    • Transport 인스턴스를 통해 스트림을 삽입/전달하는 인스턴스이다.
  • Worker
    • 단일 CPU 코어에서 실행되고 Router 인스턴스를 처리하는 스레드이다. 즉, 말 그대로 작업자이다.
     

 

 

 

각각의 클라이언트가 서버를 통해 미디어를 공유하기 위해서는 mediasoup의 Device라는 객체가 필요하다.

Device객체는 new Device() 메서드로 생성한 후 device.load()로 로드를 해야하는데, 이때 필요한 옵션으로는 라우터의 rtpCapabilites가 있다.

 

 

때문에 클라이언트는 라우터의 rtpCapabilites 값을 받아오기 위해 서버와 통신을 진행한다.

 

//client

socket.current.emit(
      "joinRoom",
      { roomName },
        (data: { rtpCapabilities: rtpCapabilitesType }) => {
          //rtpCapabilites = router.rtp...
        rtpCapabilities.current = data.rtpCapabilities;
        createDevice();
      });
//server

socket.on("joinRoom", async ({ roomName }, callback) => {
    // create Router
    const router1 = await createRoom(roomName, socket.id);
    peers[socket.id] = {
      socket,
      roomName, // Name for the Router this Peer joined
      transports: [],
      producers: [],
      consumers: [],
    };
// server


  const createRoom = async (roomName, socketId) => {
    // worker.createRouter(options)
    // options = { mediaCodecs, appData }
    // mediaCodecs -> defined above
    // appData -> custom application data - we are not supplying any
    // none of the two are required
    let router1;
    let peers = [];
    if (rooms[roomName]) {
      router1 = rooms[roomName].router;
      peers = rooms[roomName].peers || [];
    } else {
      router1 = await worker.createRouter({ mediaCodecs });
    }

    console.log(`Router ID: ${router1.id}`, peers.length);

    rooms[roomName] = {
      router: router1,
      peers: [...peers, socketId],
    };

    return router1;
  };

 

라우터의 rtpCapabilites를 전달받았다면 이를 통해 device객체를 생성 및 로드한다.

 

// client

  const createDevice = async () => {
    if (!rtpCapabilities.current) return;
    try {
      //디바이스 생성
      // 서버의 rtpCapabilities로 device load
      device.current = new Device();
      await device.current.load({
        routerRtpCapabilities: rtpCapabilities.current,
      });
      createSendTransport();
    } catch (error: any) {
      if (error.name === "UnsupportedError")
        console.warn("browser not supported");
    }
  };

 

여기까지가 1번의 내용이다.

 

다음으로는 device객체를 생성/로드한 클라이언트는 미디어를 전송할 통로는 생성 및 연결해야한다. 

client의 transport를 생성할 때 우선적으로 라우터의 router.createWebRtcTransport() 메서드로 서버측에서 transport를 생성한 이후 클라이언트의 transport를 생성할 때 라우터의 transport 정보를 넘겨 이를 토대로 transport를 생성해야한다.

 

 

(libmediasoupclient는  C++ 클라이언트 라이브러리이므로 무시해도 된다.)

// server

socket.on("createWebRtcTransport", async ({ consumer }, callback) => {
    // get Room Name from Peer's properties
    if (!peers[socket.id] || !peers[socket.id].roomName) {
      return;
    }
    const roomName = peers[socket.id].roomName;

    // get Router (Room) object this peer is in based on RoomName
    const router = rooms[roomName].router;

    createWebRtcTransport(router).then(
      (transport) => {
        callback({
          params: {
            id: transport.id,
            iceParameters: transport.iceParameters,
            iceCandidates: transport.iceCandidates,
            dtlsParameters: transport.dtlsParameters,
          },
        });

        // add transport to Peer's properties
        addTransport(transport, roomName, consumer);
      },
      (error) => {
        console.log(error);
      }
    );
  });
  
  
  
const createWebRtcTransport = async (router) => {
  return new Promise(async (resolve, reject) => {
    try {
      // https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions
      const webRtcTransport_options = {
        listenIps: [
          {
            ip: "127.0.0.1", // replace with relevant IP address
            // announcedIp: "10.0.0.115",
          },
        ],
        enableUdp: true,
        enableTcp: true,
        preferUdp: true,
      };

      // https://mediasoup.org/documentation/v3/mediasoup/api/#router-createWebRtcTransport
      let transport = await router.createWebRtcTransport(
        webRtcTransport_options
      );
      console.log(`transport id: ${transport.id}`);

      transport.on("dtlsstatechange", (dtlsState) => {
        if (dtlsState === "closed") {
          transport.close();
        }
      });

      transport.on("close", () => {
        console.log("transport closed");
      });

      resolve(transport);
    } catch (error) {
      reject(error);
    }
  });
};
//client 

  const createSendTransport = () => {
    if (!socket.current) return;

    // create sender Transport
    socket.current.emit(
      "createWebRtcTransport",
      { consumer: false },
      ({ params }: any) => {
        // The server sends back params needed
        // to create Send Transport on the client side
        if (params.error) {
          console.error(params.error);
          return;
        }
        if (!device.current) return;

        // creates a new WebRTC Transport to send media
        producerTransport.current = device.current.createSendTransport(params);

        // transport.produce이벤트 처음 호출 시 발동되는 이벤트 등록

        producerTransport.current.on(
          "connect",
          async (
            { dtlsParameters }: any,
            callback: funcType,
            errback: funcType
          ) => {
            if (!socket.current) return;
            try {
              await socket.current.emit("transport-connect", {
                dtlsParameters,
              });
              callback();
            } catch (error) {
              errback(error);
            }
          }
        );

        producerTransport.current.on(
          "produce",
          async (parameters: any, callback: funcType, errback: funcType) => {
            console.log(parameters);
            if (!socket.current) return;
            try {
              // tell the server to create a Producer
              // with the following parameters and produce
              // and expect back a server side producer id
              await socket.current.emit(
                "transport-produce",
                {
                  kind: parameters.kind,
                  rtpParameters: parameters.rtpParameters,
                  appData: parameters.appData,
                },
                ({
                  id,
                  producersExist,
                }: {
                  id: string;
                  producersExist: boolean;
                }) => {
                  callback({ id });
                  if (producersExist) getProducers();
                }
              );
            } catch (error) {
              errback(error);
            }
          }
        );

        connectSendTransport();
      }
    );
  };

 

 

transport는 방금 생성한 device객체의 createSendTransport() 메서드로 생성할 수 있다.

sendTransport()를 생성했다면 클라이언트는 여기에 오디오/비디오 트랙을 생성할 수 있다. 또한, 공식문서에 따라 connect와 produce이벤트를 등록해야한다.

connect 이벤트는 transport가 ICE + DTLS 연결을 설정하려고하고 관련 서버 transport와 정보를 교환할 때 발생한다.

DTLS

즉 connect이벤트는 서버측 transport와 클라이언트 측 transport를 연결하기 위해 등록되는 이벤트이다.

produce이벤트는 라우터에 비디오, 오디오 트랙을 보내도록 tranport에 지시하는 이벤트이다.

 

// server
const addProducer = (producer, roomName) => {
    producers = [...producers, { socketId: socket.id, producer, roomName }];

    peers[socket.id] = {
      ...peers[socket.id],
      producers: [...peers[socket.id].producers, producer.id],
    };
  };

  const addConsumer = (consumer, roomName) => {
    // add the consumer to the consumers list
    consumers = [...consumers, { socketId: socket.id, consumer, roomName }];

    // add the consumer id to the peers list
    peers[socket.id] = {
      ...peers[socket.id],
      consumers: [...peers[socket.id].consumers, consumer.id],
    };
  };

  socket.on("getProducers", (callback) => {
    //return all producer transports
    const { roomName } = peers[socket.id];

    let producerList = [];
    producers.forEach((producerData) => {
      if (
        producerData.socketId !== socket.id &&
        producerData.roomName === roomName
      ) {
        producerList = [...producerList, producerData.producer.id];
      }
    });

    // return the producer list back to the client
    callback(producerList);
  });

  const informConsumers = (roomName, socketId, id) => {
    console.log(`just joined, id ${id} ${roomName}, ${socketId}`);
    // A new producer just joined
    // let all consumers to consume this producer
    producers.forEach((producerData) => {
      if (
        producerData.socketId !== socketId &&
        producerData.roomName === roomName
      ) {
        const producerSocket = peers[producerData.socketId].socket;
        // use socket to send producer id to producer
        producerSocket.emit("new-producer", { producerId: id });
      }
    });
  };
  
 const getTransport = (socketId) => {
    const [producerTransport] = transports.filter(
      (transport) => transport.socketId === socketId && !transport.consumer
    );
    return producerTransport.transport;
  };

 socket.on("transport-connect", ({ dtlsParameters }) => {
    console.log("DTLS PARAMS... ", { dtlsParameters });

    getTransport(socket.id).connect({ dtlsParameters });
  });

  // see client's socket.emit('transport-produce', ...)
  socket.on(
    "transport-produce",
    async ({ kind, rtpParameters, appData }, callback) => {
      // call produce based on the prameters from the client
      const producer = await getTransport(socket.id).produce({
        kind,
        rtpParameters,
      });

      // add producer to the producers array
      const { roomName } = peers[socket.id];

      addProducer(producer, roomName);

      informConsumers(roomName, socket.id, producer.id);

      console.log("Producer ID: ", producer.id, producer.kind);

      producer.on("transportclose", () => {
        console.log("transport for this producer closed ");
        producer.close();
      });

      // Send back to the client the Producer's id
      callback({
        id: producer.id,
        producersExist: producers.length > 1 ? true : false,
      });
    }
  );

 

다음으로는 사용자의 stream track을 각각 produce()메서드를 이용해 보내야한다.

// client

  const connectSendTransport = async () => {
    if (!producerTransport.current) return;
    // procue()호출
    //producer transport로 미디어를 서버로 전송
    //connect produce 트리거
    audioProducer.current = await producerTransport.current.produce(
      audioParams.current
    );
    videoProducer.current = await producerTransport.current.produce(
      videoParams.current
    );

    /* close tracks*/
    audioProducer.current.on("trackended", () => {
      console.log("audio track ended");
    });

    audioProducer.current.on("transportclose", () => {
      console.log("audio transport ended");
    });

    videoProducer.current.on("trackended", () => {
      console.log("video track ended");
    });

    videoProducer.current.on("transportclose", () => {
      console.log("video transport ended");
    });
    /* close tracks*/
  };

 

 

여기까지가 3,4,5번의 내용이다. 이로써 미디어를 보내는 sender 입장에서의 설정은 모두 끝이났다.

 

이제 Receiver의 설정을 끝마쳐보자.

 

우선 첫 번째로, 등록하지 않았던 socket 이벤트를 등록해준다.

// client

 useEffect(() => {
    socket.current = io("https://localhost:8000/mediasoup");

    socket.current.on("new-producer", ({ producerId }: any) => {
      signalNewConsumerTransport(producerId);
    });

    socket.current.on("producer-closed", ({ remoteProducerId }: any) => {
      //when 유저 left
      // clean up
      if (!consumerTransports.current) return;
      const copyUsers = { ...usersRef.current };
      delete copyUsers[remoteProducerId];
      setUsers(copyUsers);

      const producerTocClose: any = consumerTransports.current.find(
        (transportData: any) => transportData.producerId === remoteProducerId
      );
      producerTocClose.consumerTransport.close();

      producerTocClose.consumer.close();

      consumerTransports.current = consumerTransports.current.filter(
        (transportData: any) => transportData.producerId !== remoteProducerId
      );
    });
  }, []);

 

"new-producer" 소켓 이벤트로 새 유저의 알아챈다.

 

이때 "new-producer"이벤트는 앞서 sender가 producer.produce()이벤트를 발생시킬 때 트리거되는 이벤트이다.

(server > informConsumers)

 

// client

  const signalNewConsumerTransport = async (remoteProducerId: string) => {
    //check if we are already consuming the remoteProducerId
    if (!socket.current) return;
    if (consumingTransports.current.includes(remoteProducerId)) return;
    consumingTransports.current.push(remoteProducerId);
    await socket.current.emit(
      "createWebRtcTransport",
      { consumer: true },
      ({ params }: any) => {
        if (params.error) {
          return;
        }
        if (!device.current) return;
        let consumerTransport;
        try {
          consumerTransport = device.current.createRecvTransport(params);
        } catch (error) {
          console.log(error);
          return;
        }

        consumerTransport.on(
          "connect",
          async ({ dtlsParameters }: any, callback: any, errback: any) => {
            if (!socket.current) return;
            try {
              await socket.current.emit("transport-recv-connect", {
                dtlsParameters,
                serverConsumerTransportId: params.id,
              });
              callback();
            } catch (error) {
              errback(error);
            }
          }
        );
        connectRecvTransport(consumerTransport, remoteProducerId, params.id);
      }
    );
  };

 

RecvTransport또한 sendTransport와 마찬가지로 라우터의 transport를 기반으로 createRecvTransport()메서드로 생성되고,

connect/produce 이벤트 또한 등록해야한다.

// server

socket.on(
    "transport-recv-connect",
    async ({ dtlsParameters, serverConsumerTransportId }) => {
      console.log(`DTLS PARAMS: ${dtlsParameters}`);
      const consumerTransport = transports.find(
        (transportData) =>
          transportData.consumer &&
          transportData.transport.id == serverConsumerTransportId
      ).transport;
      await consumerTransport.connect({ dtlsParameters });
    }
  );

  socket.on(
    "consume",
    async (
      { rtpCapabilities, remoteProducerId, serverConsumerTransportId },
      callback
    ) => {
      try {
        const { roomName } = peers[socket.id];
        const router = rooms[roomName].router;
        let consumerTransport = transports.find(
          (transportData) =>
            transportData.consumer &&
            transportData.transport.id == serverConsumerTransportId
        ).transport;

        // check if the router can consume the specified producer
        if (
          router.canConsume({
            producerId: remoteProducerId,
            rtpCapabilities,
          })
        ) {
          // transport can now consume and return a consumer
          const consumer = await consumerTransport.consume({
            producerId: remoteProducerId,
            rtpCapabilities,
            paused: true,
          });

          consumer.on("transportclose", () => {
            console.log("transport close from consumer");
          });

          consumer.on("producerclose", () => {
            console.log("producer of consumer closed");
            socket.emit("producer-closed", { remoteProducerId });

            consumerTransport.close([]);
            transports = transports.filter(
              (transportData) =>
                transportData.transport.id !== consumerTransport.id
            );
            consumer.close();
            consumers = consumers.filter(
              (consumerData) => consumerData.consumer.id !== consumer.id
            );
          });

          addConsumer(consumer, roomName);

          // from the consumer extract the following params
          // to send back to the Client
          const params = {
            id: consumer.id,
            producerId: remoteProducerId,
            kind: consumer.kind,
            rtpParameters: consumer.rtpParameters,
            serverConsumerId: consumer.id,
          };

          // send the parameters to the client
          callback({ params });
        }
      } catch (error) {
        console.log(error.message);
        callback({
          params: {
            error: error,
          },
        });
      }
    }
  );

  socket.on("consumer-resume", async ({ serverConsumerId }) => {
    console.log("consumer resume");
    const { consumer } = consumers.find(
      (consumerData) => consumerData.consumer.id === serverConsumerId
    );
    await consumer.resume();
  });
});

 

 

 

 


 

P2P 토폴로지로 구현한 부분에서는 3인을 방에 참여시킨 후 속도를 체크했습니다. (속도를 체크하는 부분은 Web Audio API를 이용했습니다.)

측정한 조건은 

1. 같은 네트워크 상에 있어야한다.

2. 3-4인이 방에 참여한다.

3. 비디오 및 음성이 연결되어있어야 한다.

등이 있었습니다. SFU를 구현한 이후 완전히 같은 조건은 아니더라도 비슷한 조건을 맞춰 진행했습니다.

requestAnimationFrame이 동작하는 텀, socket으로 메세지를 주고받는 시간을 생각하더라도 SFU의 경우 월등히 무난한 결과를 뽑을 수 있었습니다.

빠르면 0.04s 느려도 0.1을 보장합니다.

 

 

FullCode

https://github.com/junheeLee96/react-mediasoup-sfu

 

REF

https://mediasoup.org/

https://www.youtube.com/watch?v=DOe7GkQgwPo&t=644s