1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
| package cn.idea360.assistant.dev.kafka;
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration; import java.util.Collections; import java.util.Properties;
@Slf4j public class SimpleKafkaClient {
public SimpleKafkaClient() { }
public static KafkaProducer<String, String> createProducer() {
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.205:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new KafkaProducer<>(props); }
public static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.205:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
return new KafkaConsumer<>(props); }
public static void main(String[] args) { String topic = "test_data"; String key = "key"; String value = "idea360.cn"; KafkaProducer<String, String> producer = CustomKafkaClient.createProducer(); ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value); producer.send(producerRecord, (metadata, exception) -> { if (exception == null) { System.out.printf("Sent record [%s] with key [%s] to partition [%s] with offset [%s]%n", value, key, metadata.partition(), metadata.offset()); } else { exception.printStackTrace(); } }); producer.close();
KafkaConsumer<String, String> consumer = CustomKafkaClient.createConsumer(); consumer.subscribe(Collections.singletonList(topic)); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("Consumed record with key [%s] and value [%s] from partition [%d] with offset [%d]%n", record.key(), record.value(), record.partition(), record.offset()); } consumer.commitSync(); } } finally { consumer.close(); } } }
|