您好,欢迎来到叨叨游戏网。
搜索
您的当前位置:首页kafka-java客户端连接

kafka-java客户端连接

来源:叨叨游戏网
kafka-java客户端连接

使⽤java客户端, kafkaproducer, kafkaconsumer进⾏kafka的连接

注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息1, kafka 配置信息

{

\"producer\": {

\"bootstrap.servers\": \"10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093\", \"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\", \"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\", \"max.request.size\": \"10485760\", \"batch.size\": \"163840\",

\"buffer.memory\": \"536870912\", \"max.block.ms\": \"500\", \"retries\": \"3\", \"acks\": \"1\", },

\"cosumer\": {

\"bootstrap.servers\": \"10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093\", \"group.id\": \"test222\",

\"session.timeout.ms\": \"30000\",

\"key.deserializer\": \"org.apache.kafka.common.serialization.StringDeserializer\", \"value.deserializer\": \"org.apache.kafka.common.serialization.StringDeserializer\" }}

2, kafka utils, ⽤来读取kafka的配置信息

package com.wenbronk.kafka;

import com.alibaba.fastjson.JSON;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.JsonParser;import org.junit.Test;

import java.io.FileNotFoundException;import java.io.FileReader;import java.util.Map;

import java.util.Properties;

public class KafkaUtils { @Test

public void test() throws FileNotFoundException { getConfig(\"producer\");// fastJSON(); }

public static JsonObject getConfig(String name) throws FileNotFoundException { JsonParser parser = new JsonParser();

JsonElement parse = parser.parse(new FileReader(\"src/main/resources/kafka\")); JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name); System.out.println(jsonObject); return jsonObject; }

public static Properties getProperties(String sourceName) throws FileNotFoundException { JsonObject config = KafkaUtils.getConfig(sourceName); Properties properties = new Properties();

for (Map.Entry entry : config.entrySet()) { properties.put(entry.getKey(), entry.getValue().getAsString()); }

return properties; }

// public static void fastJSON() throws FileNotFoundException {

// Object o = JSON.toJSON(new FileReader(\"src/main/resources/kafka\"));// System.out.println(o);// }}

3, kafka producer

package com.wenbronk.kafka;

import com.google.gson.JsonElement;import com.google.gson.JsonObject;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.junit.Test;

import javax.swing.text.StyledEditorKit;import java.io.FileNotFoundException;import java.util.*;

import java.util.stream.IntStream;/**

* 消息提供者 */

public class KafkaProducerMain {

@Test

public void send() throws Exception {

HashMap map = new HashMap<>();

map.put(\"http_zhixin\", \"send message to kafka from producer\"); for (int i = 0; i < 3; i++ ) { sendMessage(map); }

// sendMessage(map); }

/**

* 消息发送 */

public void sendMessage(Map topicMsg) throws FileNotFoundException { Properties properties = KafkaUtils.getProperties(\"producer\");

KafkaProducer producer = new KafkaProducer(properties); for (Map.Entry entry : topicMsg.entrySet()) { String topic = entry.getKey();

String message = entry.getValue();

ProducerRecord record = new ProducerRecord(topic, message); // 发送

// producer.send(record, new CallBackFuntion(topic, message)); producer.send(record, (recordMetadata, e) -> { if (e != null) {

System.err.println(topic + \": \" + message + \"--消息发送失败\"); }else {

System.err.println(topic + \": \" + message + \"--消息发送成功\"); } }); }

producer.flush(); producer.close(); }}

回掉函数可写匿名内部类, 也可写外部类通过新建的⽅式运⾏

package com.wenbronk.kafka;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.RecordMetadata;/**

* 回掉函数 */

public class CallBackFuntion implements Callback { private String topic; private String message;

public CallBackFuntion(String topic, String message) { this.topic = topic;

this.message = message; }

@Override

public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) {

System.out.println(topic + \": \" + message + \"--消息发送失败\"); }else {

System.out.println(topic + \": \" + message + \"--消息发送成功\"); } }

}

4, kafka consumer

package com.wenbronk.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.junit.Test;

import java.io.FileNotFoundException;import java.util.*;

public class KafkaConsumerMain {

/**

* ⾃动提交offset */

public void commitAuto(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties(\"cosumer\"); props.put(\"enable.auto.commit\", \"true\");

props.put(\"auto.commit.interval.ms\", \"1000\");

KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while (true) {

ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records)

System.err.printf(\"offset = %d, key = %s, value = %s%n\", record.offset(), record.key(), record.value()); } }

/**

* ⼿动提交offset *

* @throws FileNotFoundException */

public void commitControl(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties(\"cosumer\"); props.put(\"enable.auto.commit\", \"false\");

KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); final int minBatchSize = 2;

List> buffer = new ArrayList<>(); while (true) {

ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { buffer.add(record); }

if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 阻塞同步提交

consumer.commitSync(); buffer.clear(); } } }

/**

* ⼿动设置分区 */

public void setOffSet(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties(\"cosumer\"); props.put(\"enable.auto.commit\", \"false\");

KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics);

while (true) {

ConsumerRecords records = consumer.poll(Long.MAX_VALUE); // 处理每个分区消息后, 提交偏移量

for (TopicPartition partition : records.partitions()) {

List> partitionRecords = records.records(partition); for (ConsumerRecord record : partitionRecords) { System.out.println(record.offset() + \": \" + record.value()); }

long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } }

/**

* ⼿动设置消息offset */

public void setSeek(List topics) throws FileNotFoundException { Properties props = KafkaUtils.getProperties(\"cosumer\"); props.put(\"enable.auto.commit\", \"false\");

KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(topics);

consumer.seek(new TopicPartition(\"http_zhixin\", 0), 797670770); ConsumerRecords records = consumer.poll(100);

for (ConsumerRecord record : records) {

System.err.printf(\"offset = %d, key = %s, value = %s%n\", record.offset(), record.key(), record.value()); consumer.commitSync(); } }

@Test

public void test() throws FileNotFoundException { ArrayList topics = new ArrayList<>(); topics.add(\"http_zhixin\");// commitAuto(topics);// commitControl(topics);// setOffSet(topics); setSeek(topics); }

/**

* doSomethings */

private void insertIntoDb(List> buffer) { buffer.stream().map(x -> x.value()).forEach(System.err::println); }

}

kafka 处于同⼀组的消费者, 不可以重复读取消息, 0.11版本中加⼊了事物控制

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- gamedaodao.net 版权所有 湘ICP备2024080961号-6

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务