Rabbitmq

[Rabbitmq] NestJS 에서 사용하는 Rabbitmq 1탄 - Data Comsume

개발조하 2023. 10. 17. 19:15
본 포스트에서는 Nestjs에서 Rabbitmq 사용하기 위한 기본적인 셋팅 방법, 데이터 consume 하는 방법에 대해서 기술할 예정이다.

NestJS 에서 Rabbitmq 사용 방법.

NestJS 에서 Rabbitmq 를 사용하기 위해서는 먼저 아래의 라이브러리를 설치해야 한다.

npm i --save amqplib amqp-connection-manager @nestjs/microservices

Rabbitmq 를 사용할 때 아래 조건에 따라서 main.ts 설정 부분이 아주 조금 달라진다.

내가 만든 프로젝트에서 port binding 필요 유무

이에 대한 각각의 main.ts 는 아래와 같이 설정하면 된다.

  • port binding 필요 없이 오로지 Rabbitmq 만을 위한 프로젝트면 아래와 같이 작성한다.
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

function rabbitmqURL() {
  return `amqp://${process.env.RABBITMQ_ID}:${process.env.RABBITMQ_PASSWORD}@${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}`;
}

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.RMQ,
      options: {
        urls: [rabbitmqURL()], // <-- 연결할 Rabbitmq URL 
        queue: 'big_log_queue', // <-- queue 설정
        queueOptions: {         // <-- queue Option 설정. 연결할 Queue 와 옵션이 동일해야만 제대로 연결된다. 안그러면 에러 발생
          durable: true,       // rabbitmq 의 durable 옵션으로 Rabbitmq 의 Queue 가 어떠한 이상으로 죽거나 restart 될 때 데이터 유실 방지를 위한 옵션
        },
      },
    },
  );
  await app.listen();
}
bootstrap();
  • port binding 또는 그 외 다른 옵션을 사용해야 될 경우
function rabbitmqURL() {
  // Rabbitmq 기본 계정을 다음과 같이 설정
  // id: admin, password: admin, host: localhost, port: 5672
  const id = process.env.RABBITMQ_ID
    ? (process.env.RABBITMQ_ID as string)
    : 'admin';
  const password = process.env.RABBITMQ_PASSWORD
    ? (process.env.RABBITMQ_PASSWORD as string)
    : 'admin';
  const host = process.env.RABBITMQ_HOST
    ? (process.env.RABBITMQ_HOST as string)
    : 'localhost';
  const port = process.env.RABBITMQ_PORT
    ? (process.env.RABBITMQ_PORT as string)
    : '5672';
  const rabbitmqURL = `amqp://${id}:${password}@${host}:${port}`;
  console.debug(`rabbitmq url: ${rabbitmqURL}`);
  return rabbitmqURL;
}

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
	// NestFactory 객체 생성 후에 해당 객체에 마이크로서비스를 연결한다.
  // 이와 같이 사용하면 port binding 외에 cors 설정, pipe 설정 등을 할 수 있다.
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.RMQ,
    options: {
      urls: [rabbitmqURL()],
      queue: 'big_log_queue',
      queueOptions: {
        durable: true,
      },
    },
  });
  // 등록한 마이크로 서비스 실행 부분
  await app.startAllMicroservices();
  // port listen 부분
  await app.listen(3333);
}

위와 같이 설정 했다면 이제 Rabbitmq 와 연결이 된 것이다. 이제 데이터를 consume 해보자. 데이터를 consume 하는 예제 코드는 다음과 같다.

import { Controller } from '@nestjs/common';
import { AppService } from './app.service';
import {
  Ctx,
  MessagePattern,
  Payload,
  RmqContext,
} from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern('big_log_test')
  getNotifications(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log(
      `Pattern: ${context.getPattern()} data: ${JSON.stringify(data)}`,
    );
  }

  @MessagePattern('big_log_test_2')
  getBigLogTest2(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log(
      `Pattern: ${context.getPattern()} data: ${JSON.stringify(data)}`,
    );
  }

  @MessagePattern('big_log_test_3')
  getBigLogTest3(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log(
      `Pattern: ${context.getPattern()} data: ${JSON.stringify(data)}`,
    );
  }
}

자 이제 위 코드에서 사용된 Rabbitmq 관련 데코레이터들을 살펴보자.

  • @MessagePattern : 특정 패턴의 메시지를 처리하는 데코레이터. 지정한 패턴과 일치한 메시지가 수신되면 컨트롤러 메소드를 실행합니다.
    • ex. 위 코드에서 {pattern: 'big_log_test3', data: 'test'} 을 consume 했다면 getNotifications 메소드를 호출한다.
    • nestjs 의 Rabbitmq 에서는 저 pattern 필드를 보고 구분해서 controller 로직을 수행할 수 있다.
  • @Payload : 실제로 Rabbitmq 에서 consume 한 데이터를 가져오는 데코레이터.
  • @Ctx: Rabbitmq를 통해서 받은 데이터의 컨텍스트 데이터를 받아온다. 메시지 정보, 채널 정보, 메시지 속성, 전달 관련 정보가 있다.

위와 같이 설정 하면 우리가 저장한 queue 에서 데이터를 consume 할 수 있다.

참고