Kafka 및 Spring Boot 테스트

1. 개요

Apache Kafka는 강력하고 분산 된 내결함성 스트림 처리 시스템입니다. 이전 튜토리얼에서 Spring과 Kafka로 작업하는 방법을 배웠습니다.

이 튜토리얼에서는 이전 버전을 기반으로하여 .

먼저 Kafka의 임베디드 인스턴스를 사용하고 구성하는 방법을 살펴 보겠습니다. 그런 다음 테스트에서 널리 사용되는 프레임 워크 Testcontainers를 어떻게 사용할 수 있는지 살펴 보겠습니다.

2. 종속성

물론 표준 spring-kafka 종속성을 pom.xml 에 추가해야합니다 .

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

그런 다음 테스트를 위해 특별히 두 가지 종속성이 더 필요합니다 . 먼저 spring-kafka-test 아티팩트를 추가합니다 .

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

마지막으로 Maven Central에서도 사용할 수있는 Testcontainers Kafka 종속성을 추가합니다.

 org.testcontainers kafka 1.15.0 test 

이제 필요한 모든 종속성을 구성 했으므로 Kafka를 사용하여 간단한 Spring Boot 애플리케이션을 작성할 수 있습니다.

3. 간단한 Kafka 생산자-소비자 애플리케이션

이 튜토리얼 전체에서 테스트의 초점은 간단한 생산자-소비자 Spring Boot Kafka 애플리케이션입니다.

애플리케이션 진입 점을 정의하는 것으로 시작하겠습니다.

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaProducerConsumerApplication.class, args); } }

보시다시피 이것은 표준 Spring Boot 애플리케이션입니다. 가능한 경우 기본 구성 값을 사용하려고합니다 . 이를 염두에두고 @EnableAutoConfiguration 주석을 사용하여 애플리케이션을 자동 구성합니다.

3.1. 생산자 설정

다음으로 주어진 Kafka 토픽에 메시지를 보내는 데 사용할 생산자 빈을 고려해 보겠습니다.

@Component public class KafkaProducer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplate kafkaTemplate; public void send(String topic, String payload) { LOGGER.info("sending payload="{}" to topic="{}"", payload, topic); kafkaTemplate.send(topic, payload); } }

우리 KafkaProducer의 위에서 정의 된 빈은 단지 래퍼입니다 KafkaTemplate의 클래스입니다. 이 클래스는 제공된 주제로 데이터를 보내는 것과 같은 높은 수준의 스레드로부터 안전한 작업을 제공합니다. 바로 send 메서드 에서 수행하는 작업입니다 .

3.2. 소비자 설정

마찬가지로 이제 Kafka 토픽을 수신하고 메시지를 수신하는 간단한 소비자 빈을 정의합니다.

@Component public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); private CountDownLatch latch = new CountDownLatch(1); private String payload = null; @KafkaListener(topics = "${test.topic}") public void receive(ConsumerRecord consumerRecord) { LOGGER.info("received payload="{}"", consumerRecord.toString()); setPayload(consumerRecord.toString()); latch.countDown(); } public CountDownLatch getLatch() { return latch; } public String getPayload() { return payload; } }

우리의 간단한 소비자는 수신 메서드 에서 @KafkaListener 주석을 사용하여 주어진 주제에 대한 메시지를 수신합니다. 나중에 테스트 에서 test.topic 을 구성하는 방법을 살펴 보겠습니다 .

또한 수신 메소드는 메시지 내용을 빈에 저장하고 래치 변수 의 수를 감소시킵니다 . 이 변수는 메시지를 성공적으로 수신했는지 확인하기 위해 나중에 테스트에서 사용할 간단한 스레드 안전 카운터 필드입니다 .

이제 Spring Boot를 사용하는 간단한 Kafka 애플리케이션이 구현되었으므로 통합 테스트를 작성하는 방법을 살펴 보겠습니다.

4. 테스트에 관한 한마디

일반적으로 깨끗한 통합 테스트를 작성할 때 제어 할 수 없거나 갑자기 작동이 중지 될 수있는 외부 서비스에 의존해서는 안됩니다 . 이는 테스트 결과에 부정적인 영향을 미칠 수 있습니다.

마찬가지로 외부 서비스 (이 경우 실행중인 Kafka 브로커)에 의존하는 경우 테스트에서 원하는 방식으로 설정, 제어 및 해체 할 수 없습니다.

4.1. 응용 프로그램 속성

테스트에서 매우 가벼운 애플리케이션 구성 속성 집합을 사용할 것입니다. src / test / resources / application.yml 파일 에서 이러한 속성을 정의 합니다.

spring: kafka: consumer: auto-offset-reset: earliest group-id: baeldung test: topic: embedded-test-topic

이것은 Kafka 또는 로컬 브로커의 임베디드 인스턴스로 작업 할 때 필요한 최소 속성 집합입니다.

이것들 대부분은 자명 하지만, 우리가 특별히 강조해야 할 것은 소비자 자산 auto-offset-reset : earliest 입니다. 이 속성은 전송이 완료된 후 컨테이너가 시작될 수 있으므로 소비자 그룹이 전송 한 메시지를 받도록합니다.

또한 테스트에서 사용할 주제embedded-test-topic 값을 사용하여 주제 속성을 구성합니다 .

5. 임베디드 Kafka를 사용한 테스트

이 섹션에서는 메모리 내 Kafka 인스턴스를 사용하여 테스트를 실행하는 방법을 살펴 보겠습니다. 이를 Embedded Kafka라고도합니다.

이전에 추가 한 종속성 spring-kafka-test 에는 애플리케이션 테스트를 지원하는 몇 가지 유용한 유틸리티가 포함되어 있습니다. 특히 EmbeddedKafkaBroker 클래스 가 포함되어 있습니다 .

이를 염두에두고 첫 번째 통합 테스트를 작성해 보겠습니다.

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" }) class EmbeddedKafkaIntegrationTest { @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own simple KafkaProducer"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

테스트의 핵심 부분을 살펴 보겠습니다. 먼저 두 개의 표준 Spring 주석으로 테스트 클래스를 장식하는 것으로 시작합니다.

  • @SpringBootTest 주석은 우리의 테스트는 Spring 애플리케이션 컨텍스트를 부트 스트랩 보장합니다
  • 또한 @DirtiesContext 주석을 사용 하여이 컨텍스트가 서로 다른 테스트간에 정리되고 재설정되도록합니다.

여기에 중요한 부분이 있습니다. @EmbeddedKafka 주석을 사용하여 EmbeddedKafkaBroker 의 인스턴스를 테스트 에 삽입합니다 . 또한 임베디드 Kafka 노드를 구성하는 데 사용할 수있는 몇 가지 속성이 있습니다.

  • partitions – 주제 당 사용되는 파티션 수입니다. 멋지고 단순하게 유지하기 위해 테스트에서 하나만 사용하기를 원합니다.
  • brokerProperties – Kafka 브로커에 대한 추가 속성입니다. 다시 단순하게 유지하고 일반 텍스트 리스너와 포트 번호를 지정합니다.

다음으로, 소비자생산자 클래스를 자동 연결 하고 application.properties 의 값을 사용하도록 주제를 구성합니다 .

퍼즐의 마지막 부분을 위해 테스트 주제에 메시지를 보내고 메시지가 수신되었으며 테스트 주제의 이름이 포함되어 있는지 확인합니다 .

테스트를 실행하면 자세한 Spring 출력을 볼 수 있습니다.

... 12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload="Sending with our own simple KafkaProducer" to topic="embedded-test-topic" ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),  key = null, value = 간단한 KafkaProducer로 보내기) ' 

이것은 테스트가 제대로 작동하고 있음을 확인합니다. 대박! 이제 우리는 인 메모리 Kafka 브로커를 사용하여 독립적이고 독립적 인 통합 테스트를 작성할 수 있습니다.

6. TestContainers로 Kafka 테스트

Sometimes we might see small differences between a real external service vs. an embedded in-memory instance of a service that has been specifically provided for testing purposes. Although unlikely, it could also be that the port used from our test might be occupied, causing a failure.

With that in mind, in this section, we'll see a variation on our previous approach to testing using the Testcontainers framework. We'll see how to instantiate and manage an external Apache Kafka broker hosted inside a Docker container from our integration test.

Let's define another integration test which will be quite similar to the one we saw in the previous section:

@RunWith(SpringRunner.class) @Import(com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest(classes = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest { @ClassRule public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")); @Autowired private KafkaConsumer consumer; @Autowired private KafkaProducer producer; @Value("${test.topic}") private String topic; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived() throws Exception { producer.send(topic, "Sending with own controller"); consumer.getLatch().await(10000, TimeUnit.MILLISECONDS); assertThat(consumer.getLatch().getCount(), equalTo(0L)); assertThat(consumer.getPayload(), containsString("embedded-test-topic")); } }

Let's take a look at the differences this time around. We're declaring the kafka field, which is a standard JUnit @ClassRule. This field is an instance of the KafkaContainer class that will prepare and manage the lifecycle of our container running Kafka.

To avoid port clashes, Testcontainers allocates a port number dynamically when our docker container starts. For this reason, we provide a custom consumer and producer factory configuration using the class KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs() { Map props = new HashMap(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // more standard configuration return props; } @Bean public ProducerFactory producerFactory() { Map configProps = new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); // more standard configuration return new DefaultKafkaProducerFactory(configProps); }

We then reference this configuration via the @Import annotation at the beginning of our test.

The reason for this is that we need a way to inject the server address into our application, which as previously mentioned, is generated dynamically. We achieve this by calling the getBootstrapServers() method, which will return the bootstrap server location:

bootstrap.servers = [PLAINTEXT://localhost:32789]

Now when we run our test, we should see that Testcontainers does several things:

  • Checks our local Docker setup.
  • Pulls the confluentinc/cp-kafka:5.4.3 docker image if necessary
  • Starts a new container and waits for it to be ready
  • Finally, shuts down and deletes the container after our test finishes

Again, this is confirmed by inspecting the test output:

13:33:10.396 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Creating container for image: confluentinc/cp-kafka:5.4.3 13:33:10.454 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Starting container with ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13:33:10.785 [main] INFO ? [confluentinc/cp-kafka:5.4.3] - Container confluentinc/cp-kafka:5.4.3 is starting: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! A working integration test using a Kafka docker container.

7. Conclusion

이 기사에서는 Spring Boot로 Kafka 애플리케이션을 테스트하는 몇 가지 접근 방식에 대해 배웠습니다. 첫 번째 접근 방식에서는 로컬 메모리 내 Kafka 브로커를 구성하고 사용하는 방법을 보았습니다.

그런 다음 테스트 컨테이너를 사용하여 테스트에서 도커 컨테이너 내부에서 실행되는 외부 Kafka 브로커를 설정하는 방법을 보았습니다.

항상 그렇듯이 기사의 전체 소스 코드는 GitHub에서 사용할 수 있습니다.