카카오톡 클론코딩(6) - ChatMessage

2025. 3. 12. 21:52Project

이제 카카오톡의 핵심기능인 채팅부분인 ChatMessage 관련부분을 다뤄보자

 

우선 레디스에 먼저 저장하니까 

레디스 엔티티를 먼저보자

@Getter
@Builder
@AllArgsConstructor
@RedisHash("chat_message")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ChatMessageRedis {
    @Id
    private String id;

    private String roomId;

    private String senderId;

    private String senderName;

    private String message;

    private LocalDateTime created_at;

    private List<String> imageUrls;

    @TimeToLive
    private Long ttl = TimeUnit.DAYS.toSeconds(7);

    public static ChatMessageRedis create(String roomId, String senderId, String senderName, String message, List<String> imageUrls) {
        return ChatMessageRedis.builder()
                .id(UUID.randomUUID().toString())
                .roomId(roomId)
                .senderId(senderId)
                .senderName(senderName)
                .message(message)
                .created_at(LocalDateTime.now())
                .imageUrls(imageUrls)
                .ttl(TimeUnit.DAYS.toSeconds(7))
                .build();
    }
}

ttl 을 설정해서 7일간만 레디스에서 보관하도록 하고 그 이후에는 MySql로 백업한다.

 

그 다음은 MySql에 저장하기위한 엔티티를 보자

@Entity
@Table(name = "chat_message")
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class ChatMessageMySql {
    @Id
    private String id;

    @Column(name = "room_id", nullable = false)
    private Long roomId;

    @Column(name = "sender_id", nullable = false)
    private Long senderId;

    @Column(name = "sender_name", nullable = false)
    private String senderName;

    @Column(name = "message", columnDefinition = "TEXT")
    private String message;


    @Column(name = "created_at", nullable = false)
    private LocalDateTime createdAt;

    @ElementCollection
    @CollectionTable(name = "chat_message_images", joinColumns = @JoinColumn(name = "chat_message_id"))
    @Column(name = "image_url")
    @BatchSize(size = 20)
    private List<String> imageUrls;

    public static ChatMessageMySql createEntity(ChatMessageRedis chatMessageRedis) {
        return ChatMessageMySql.builder()
                .roomId(Long.valueOf(chatMessageRedis.getRoomId()))
                .senderId(Long.valueOf(chatMessageRedis.getId()))
                .senderName(chatMessageRedis.getSenderName())
                .message(chatMessageRedis.getMessage())
                .createdAt(chatMessageRedis.getCreated_at())
                .imageUrls(chatMessageRedis.getImageUrls())
                .build();
    }
}

레디스에서 필요한 필드들을 받아와 저장한다.

 

그다음 서비스 레이어를 2개를 만들었는데

한가지는 채팅을 저장하고 아카이빙, 조회 하기위한 서비스 ChatMessageService

나머지 한가지는 채팅을 보내는 서비스 ChatMessageService 이다.

 

먼저 ChatMessageService를 보자

@Service
@RequiredArgsConstructor
@Slf4j
public class ChatMessageService {
    @Qualifier("chatRoomRedisTemplate")
    private final RedisTemplate<String, ChatMessageRedis> redisTemplate;

    private final ChatMessageMySqlRepository chatMessageMySqlRepository;

    public void saveMessage(String roomId, ChatMessageRedis message) {
        String key = "chat:room:" + roomId;

        log.info("Saving message to Redis. Key: {}, Message: {}", key, message);

        redisTemplate.opsForList().rightPush(key, message);
        redisTemplate.expire(key, 7, TimeUnit.DAYS);
        log.info("Message saved to Redis. Key: {}", key);
    }

    public List<ChatMessageScrollDto> getMessage(String roomId, LocalDateTime lastCreatedAt, int size) {
        String redisKey = "chat:room:" + roomId;

        List<ChatMessageRedis> redisMessages = redisTemplate.opsForList().range(redisKey, 0, -1);
        List<ChatMessageScrollDto> redisResult = new ArrayList<>();

        if (redisMessages != null || !redisMessages.isEmpty()) {
            redisResult = redisMessages.stream()
                    .filter(msg -> msg.getCreated_at().isBefore(lastCreatedAt))
                    .sorted(Comparator.comparing(ChatMessageRedis::getCreated_at).reversed())
                    .limit(size)
                    .map(msg -> ChatMessageScrollDto.createDto(
                            msg.getSenderName(), msg.getMessage(), msg.getCreated_at(), msg.getImageUrls()
                    )).collect(Collectors.toList());
        }
        if (redisResult.size() < size) {
            int remainingSize = size - redisResult.size();
            List<ChatMessageMySql> olderMessages = chatMessageMySqlRepository.findOlderMessages(Long.valueOf(roomId), lastCreatedAt, remainingSize);

            List<ChatMessageScrollDto> mySqlResult = olderMessages.stream()
                    .map(msg -> ChatMessageScrollDto.createDto(
                            msg.getSenderName(), msg.getMessage(), msg.getCreatedAt(), msg.getImageUrls()
                    ))
                    .collect(Collectors.toList());

            redisResult.addAll(mySqlResult);
        }

        return redisResult;
    }

    public void archiveChatMessages() {
        Set<String> keys = redisTemplate.keys("chat:room:*");

        if (keys == null || keys.isEmpty()) {
            log.info("레디스 키 없음");
            return;
        }

        keys.forEach(key -> {
            List<ChatMessageRedis> messages = redisTemplate.opsForList().range(key, 0, -1);
            if (messages != null && !messages.isEmpty()) {
                saveMessagesToDataBase(messages);
            }
        });
    }

    @Transactional
    protected void saveMessagesToDataBase(List<ChatMessageRedis> messages) {
        List<ChatMessageMySql> messageEntities = messages.stream()
                .map(ChatMessageMySql::createEntity)
                .collect(Collectors.toList());

        chatMessageMySqlRepository.saveAll(messageEntities);
    }

}

지난번에 리프레시 토큰을 CRUD 레포지토리를 상속받아 이용할때 문제를 겪었기 때문에

RedisTemplate를 이용해 CRUD를 조작했다.

여기서 만들면서 가장 힘들었던 부분은 역시 레디스에서 일주일이 지나면 데이터가 날라가는데 일주일 이전의 채팅내역을 무한 스크롤 방식으로 어떻게 가져올까 였다. 답은 뻔하지만 20개씩 무한 스크롤로 가져오고 남는 갯수만큼 MySql로 조회해오면 된다.

레디스에 남은갯수가 0이면 MySql 에서 20개 가져오면 끝이다.

 

다음은 채팅을 보내는 과정인 ChatService를 봐보자

@Service
@RequiredArgsConstructor
public class ChatService {

    private final ChatMessageService chatMessageService;
    private final MemberRepository memberRepository;
    private final RedisPublisher redisPublisher;
    private final ChatRoomRepository chatRoomRepository;
    private final S3Service s3Service;


    public List<String> uploadFiles(List<MultipartFile> multipartFiles) {
        return multipartFiles.stream()
                .map(s3Service::uploadFileToTemp)
                .collect(Collectors.toList());
    }


    public List<String> moveFilesToFinalLocation(List<String> tempUrls) {
        return tempUrls.stream()
                .map(s3Service::moveFileToFinalLocation)
                .collect(Collectors.toList());
    }


    public void sendMessage(Long kokoaId, String roomId, String message, List<String> tempUrls) {
        Optional<Member> memberOptional = memberRepository.findById(kokoaId);
        Member member = memberOptional.orElseThrow(() -> new CustomException(ExceptionCode.MEMBER_NOT_FOUND));

        Optional<ChatRoom> chatRoomOptional = chatRoomRepository.findById(Long.valueOf(roomId));
        if(chatRoomOptional.isEmpty()) {
            throw new CustomException(ExceptionCode.CHAT_ROOM_NOT_FOUND);
        }
        List<String> finalImageUrls = moveFilesToFinalLocation(tempUrls);

        ChatMessageRedis chatMessageRedis = ChatMessageRedis.create(roomId,
                String.valueOf(member.getKokoaId()),
                member.getNickname(),
                message,
                finalImageUrls);

        chatMessageService.saveMessage(roomId, chatMessageRedis);

        ChannelTopic channelTopic = new ChannelTopic("chat.room." + roomId);
        redisPublisher.publish(channelTopic, chatMessageRedis);
    }

}

 

프로필 이미지, 배경 이미지와 달리 여러개의 이미지를 보낼 수 있으니 그걸 처리해주기 위한 메서드들을 작성해주고

chatMessageRedis를 만들어 발행해주는 역할이다. 채팅을 보내기 위해 이미지 업로드를 해서 임시적인 경로로 만들어주고 해당 경로를 포함해서 메시지를 보낼때 해당 경로를 최종경로로 바꿔줘서 저장하고 메시지를 발행한다.

 

@RequiredArgsConstructor
@Service
@Slf4j
public class RedisPublisher {
    @Qualifier("chatRoomRedisTemplate")
    private final RedisTemplate<String, ChatMessageRedis> redisTemplate;

    public void publish(ChannelTopic channelTopic, ChatMessageRedis message) {
        log.info("published topic = {}", channelTopic.getTopic());
        redisTemplate.convertAndSend(channelTopic.getTopic(), message);
    }
}
@RequiredArgsConstructor
@Service
@Slf4j
public class RedisSubscriber implements MessageListener {

    @Qualifier("chatRoomRedisTemplate")
    private final RedisTemplate<String, ChatMessageRedis> redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;



    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // Redis에서 데이터 역직렬화
            ChatMessageRedis roomMessage = (ChatMessageRedis) redisTemplate.getValueSerializer().deserialize(message.getBody());
            log.info("Deserialized message: {}", roomMessage.getMessage());

            String formattedMessage = roomMessage.getSenderName() + " : " + roomMessage.getMessage();

            // WebSocket으로 메시지 전송
            messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), formattedMessage);
        } catch (Exception e) {
            log.error("Error during message processing: ", e);
        }
    }
}

 

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
@Slf4j
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final JwtTokenizer jwtTokenizer;
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/sub");
        registry.setApplicationDestinationPrefixes("/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-stomp")
                .setAllowedOriginPatterns("*")
                .addInterceptors(new AuthHandshakeInterceptor(jwtTokenizer))
                .setHandshakeHandler(new CustomPrincipalHandshakeHandler());
    }

}

 

위의 3개의 코드는 채팅을 가능하게 하는 코드인데

쉽게 채팅의 과정을 설명해보자면

 

1. 클라이언트(STOMP) -> 서버로 메시지 전송

클라이언트가 STOMP 프로토콜을 통해 서버에 메시지를 보낸다.

/pub/chat/send 경로로 send() 요청을 보내는 것이다.

 

2. 서버가 메시지를 받아 Redis에 저장하고 WebSocket으로 전송

레디스에 메시지를 저장한후

저장된 메시지를 RedisPublisher.publish()로 Redis의 Pub/Sub 을 통해 발행한다.

 

3. Redis가 메시지를 구독하는 모든 서버에 전파

RedisSubscriber.onMessage() 가 메시지를 수신

 

4. WebSocket이 메시지를 구독 중인 클라이언트들에게 전송

메시지를 역직렬화 하여 WebSocket에 전달

/sub/chat/room/{roomId} 경로의 WebSocket 구독자들에게 메시지 전송

클라이언트들이 해당 채널을 구독하고 있다 실시간으로 메시지를 받음.

 

더 쉽게 설명하자면 클라이언트1이 채팅방1, 채팅방2 구독을 하고 있고 클라이언트2가 채팅방3, 채팅방4를 구독하고 있으면

레디스는 채팅방1~4를 모두 구독하고 있고 해당 클라이언트가 구독한 채널들에 적절하게 메시지를 뿌려주는 것이다.

 

이제 채팅이 진행되는 과정을 알았으니

 

채팅에 적용한 Scheduler 들에 대해 알아보자

 

우선 7일마다 자동으로 백업해야 하기 때문에

백업을 위한 코드가 필요하다

chatMessageService의 archiveChatMessage() 메서드를 사용하면 끝이다.

근데 왜 7일마다 백업이라면서 6일 마다 하고 있냐면 7일이면 ttl이 만료인데 시간이 어긋나

데이터가 유실될 가능성을 고려해서 6일마다 백업을 할 수 있게 설정해두었다.

@Component
@RequiredArgsConstructor
public class ChatMessageScheduler {

    private final ChatMessageService chatMessageService;

    @Scheduled(fixedRate = 6 * 24 * 60 * 60 * 1000)
    public void backupChatMessages() {
        chatMessageService.archiveChatMessages();
    }
}

 

다음은 S3Scheduler 이다.

@Component
@RequiredArgsConstructor
@Slf4j
public class S3Scheduler {
    private final AmazonS3 amazonS3;
    @Value("${cloud.aws.s3.bucket}")
    private String bucket;

    @Scheduled(cron = "0 0 3 * * *")
    public void deleteUnusedTempFiles() {
        log.info("임시 파일 삭제");

        ListObjectsV2Request request = new ListObjectsV2Request()
                .withBucketName(bucket)
                .withPrefix("temp/");

        ListObjectsV2Result result = amazonS3.listObjectsV2(request);
        result.getObjectSummaries().forEach(s3object -> {
            Date lastModified = s3object.getLastModified();
            if (isOlderThan24Hours(lastModified)) {
                amazonS3.deleteObject(bucket, s3object.getKey());
                log.info("24시간 동안 사용되지 않은 임시파일 삭제 : {}", s3object.getKey());
            }
        });
    }

    private boolean isOlderThan24Hours(Date lastModified) {
        long currentTime = System.currentTimeMillis();
        long fileTime = lastModified.getTime();
        return (currentTime - fileTime) > (24 * 60 * 60 * 1000);
    }
}

 

이 S3Scheduler가 필요한 이유가 무엇일까? 만약에 채팅에 이미지를 포함시켜 보내고싶다 해보자 일단 임시로 s3에 저장하고 나중에 최종경로로 수정한다고 위에서 언급했었다. 근데 이미지를 업로드해서 임시로 s3에 저장해두고 갑자기 보내기 싫어져서 메시지를 안보낸다면? 그러면 영원히 s3 임시경로에 남아있는것이다. 그렇게 데이터가 쌓이는걸 방지해주기 위해서 새벽 3시마다 임시 경로를 비워주기로 결정했다.

 

이 코드들을 작성할때 처음에는 굉장히 애먹으면서 작성했었다. 채팅부분은 직접 처음부터 끝까지 구현한건 처음이기도 했고 레디스도 익숙하지 않았기 때문이다. 프로젝트를 진행하면서 많은 오류와 많은 시행착오들을 겪었었다. 대부분이 레디스 키값을 잘못 지정해서 생긴일이 많았지만.. 이번에 확실히 겪었으니 다음에는 실수하지 않을 수 있을것이다. 또한 생각없이 코드 작성을 하다

여기서도 S3 서비스를 만들기 이전에 중복된 코드들이 많이 나오기도 했었다. S3서비스로 중복된 부분을 줄이고 이미지 관련 관리 부분을 넘겨주니까 책임도 확실해지고 코드가 깔끔해진것도 느낄 수 있었다. ChatService는 S3 서비스로 인해서 이미지 관리 말고 메시지를 발행하는것에 집중 할 수 있게 된 것이다. ChatMessage 관련부분은 여기까지이고 사실 진작 먼저 보여줬어야 할거 같은 S3 서비스를 마지막으로 대략적인 프로젝트 기능 소개 및 진행하면서 겪었던 이슈들을 마칠것이다. 이 이후에는 테스트코드를 도메인 단위로 순수 자바/코틀린 코드로만 작성해볼 것이고 그 이후에 가능하다면 mySql 이중화 작업 (읽기전용/쓰기전용)으로 시도 해 볼 것이다.