[2025-11-04] Kafka 실행

🦥 본문

  • docker-compose.yml
version: '3'
 
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    environment:
      ZOOKEEPER_SERVER_ID: 1 # 주키퍼를 식별하는 아이디로 유일한 값, 1개의 주키퍼를 사용할 예정이라 없어도 문제 없음
      ZOOKEEPER_CLIENT_PORT: 2181 # 주키퍼 포트, 기본 포트로 2181 사용
      ZOOKEEPER_TICK_TIME: 2000 # 클러스터를 구성할 때 동기화를 위한 기본 틱 타임
 
  broker:
    image: confluentinc/cp-kafka:7.0.0
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1 # 카프카의 브로커 아이디로 유일한 값, 1개의 브로커를 사용할 예정이라 없어도 문제 없음
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 주키퍼에 연결하기 위한 대상 지정 [서비스이름:컨테이너내부포트]
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT # 보안을 위한 프로토콜 매핑. PLAINTEXT는 암호화하지 않은 일반 평문
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 # 외부 클라이언트에 알려주는 리스너 주소
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 토픽 복제에 대한 설정 값
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 # 트랜잭션 최소 ISR(InSyncReplicas 설정) 수
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # 트랜잭션 상태에서 복제 수
  • docker-compose

    .yml 파일이 있는 디렉토리에 온 후 다음과 같이 실행

      docker-compose up -d
    

    image.png

  • 가상 환경 설정

    가상 환경 생성 후 진입

      pip install kafka-python
    
      python .\producer.py
      python .\consumer.py
    
  • producer.py

      from kafka import KafkaProducer
      from json import dumps
      import time
         
      producer = KafkaProducer(
          acks=1, # 메시지 전송 완료에 대한 체크
          compression_type='gzip', # 메시지 전달할 때 압축(None, gzip, snappy, lz4 등)
          bootstrap_servers=['localhost:9092'], # 전달하고자 하는 카프카 브로커의 주소 리스트
          value_serializer=lambda x:dumps(x).encode('utf-8') # 메시지의 값 직렬화
      )
         
      start = time.time()
         
      for i in range(1000):
          data = {'str' : 'result'+str(i)}
          producer.send('topic1', value=data)
          producer.flush() # 
         
      print('[Done]:', time.time() - start)
    
    • 맨 처음에는 Producer의 메시지를 브로커가 받지 않았다가, ack=1로 설정하여 메시지 전송 완료에 대한 보장을 하여 성공하였다.
  • consumer.py

      from kafka import KafkaConsumer
      from json import loads
         
      consumer = KafkaConsumer(
          'topic1', # 토픽명
          bootstrap_servers=['localhost:9092'], # 카프카 브로커 주소 리스트
          auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근)
          enable_auto_commit=True, # 오프셋 자동 커밋 여부
          group_id='test-group', # 컨슈머 그룹 식별자
          value_deserializer=lambda x: loads(x.decode('utf-8')), # 메시지의 값 역직렬화
          consumer_timeout_ms=1000 # 데이터를 기다리는 최대 시간
      )
         
      print('[Start] get consumer')
         
      for message in consumer:
          print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
         
      print('[End] get consumer')
    

Categories:

Updated:

Leave a comment