application.properties:
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092#spring.kafka.bootstrap-servers=kafka1:9092# msspring.kafka.consumer.auto-commit-interval=100# What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server.#spring.kafka.consumer.auto-offset-reset=# Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.#spring.kafka.consumer.bootstrap-servers=# Id to pass to the server when making requests; used for server-side logging.spring.kafka.consumer.client-id=s_3a_auth_biz_consumer# If true the consumer's offset will be periodically committed in the background.spring.kafka.consumer.enable-auto-commit=true# Unique string that identifies the consumer group this consumer belongs to.spring.kafka.consumer.group-id=AuthLogCol01spring.kafka.consumer.heartbeat-interval=1000# Maximum number of records returned in a single call to poll().spring.kafka.consumer.max-poll-records=10# Number of threads to run in the listener containers.spring.kafka.listener.concurrency=3# Timeout in milliseconds to use when polling the consumer.spring.kafka.listener.poll-timeout=3000
Consumer代码:
package com.pasenger.kafka.consumer;import lombok.extern.slf4j.Slf4j;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;/** * 认证日志记录采集 * TOPIC: AAA_PLT_AUTH_LOG * Created by Pasenger on 2017/3/20. */@Component@Slf4jpublic class AuthLogConsumer { @KafkaListener(topics = "AAA_PLT_AUTH_LOG", group = "AuthLogColGroup") public void process(String message){ log.info(message); }}