[2025-11-04] Kafka 실행
🦥 본문
- docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.0
hostname: zookeeper
container_name: zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1 # 주키퍼를 식별하는 아이디로 유일한 값, 1개의 주키퍼를 사용할 예정이라 없어도 문제 없음
ZOOKEEPER_CLIENT_PORT: 2181 # 주키퍼 포트, 기본 포트로 2181 사용
ZOOKEEPER_TICK_TIME: 2000 # 클러스터를 구성할 때 동기화를 위한 기본 틱 타임
broker:
image: confluentinc/cp-kafka:7.0.0
container_name: broker
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1 # 카프카의 브로커 아이디로 유일한 값, 1개의 브로커를 사용할 예정이라 없어도 문제 없음
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # 주키퍼에 연결하기 위한 대상 지정 [서비스이름:컨테이너내부포트]
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT # 보안을 위한 프로토콜 매핑. PLAINTEXT는 암호화하지 않은 일반 평문
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 # 외부 클라이언트에 알려주는 리스너 주소
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 토픽 복제에 대한 설정 값
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 # 트랜잭션 최소 ISR(InSyncReplicas 설정) 수
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 # 트랜잭션 상태에서 복제 수
-
docker-compose
.yml 파일이 있는 디렉토리에 온 후 다음과 같이 실행
docker-compose up -d
-
가상 환경 설정
가상 환경 생성 후 진입
pip install kafka-pythonpython .\producer.py python .\consumer.py -
producer.py
from kafka import KafkaProducer from json import dumps import time producer = KafkaProducer( acks=1, # 메시지 전송 완료에 대한 체크 compression_type='gzip', # 메시지 전달할 때 압축(None, gzip, snappy, lz4 등) bootstrap_servers=['localhost:9092'], # 전달하고자 하는 카프카 브로커의 주소 리스트 value_serializer=lambda x:dumps(x).encode('utf-8') # 메시지의 값 직렬화 ) start = time.time() for i in range(1000): data = {'str' : 'result'+str(i)} producer.send('topic1', value=data) producer.flush() # print('[Done]:', time.time() - start)- 맨 처음에는 Producer의 메시지를 브로커가 받지 않았다가, ack=1로 설정하여 메시지 전송 완료에 대한 보장을 하여 성공하였다.
-
consumer.py
from kafka import KafkaConsumer from json import loads consumer = KafkaConsumer( 'topic1', # 토픽명 bootstrap_servers=['localhost:9092'], # 카프카 브로커 주소 리스트 auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근) enable_auto_commit=True, # 오프셋 자동 커밋 여부 group_id='test-group', # 컨슈머 그룹 식별자 value_deserializer=lambda x: loads(x.decode('utf-8')), # 메시지의 값 역직렬화 consumer_timeout_ms=1000 # 데이터를 기다리는 최대 시간 ) print('[Start] get consumer') for message in consumer: print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}') print('[End] get consumer')
Leave a comment