使⽤java客户端, kafkaproducer, kafkaconsumer进⾏kafka的连接
注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息1, kafka 配置信息
{
\"producer\": {
\"bootstrap.servers\": \"10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093\", \"key.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\", \"value.serializer\": \"org.apache.kafka.common.serialization.StringSerializer\", \"max.request.size\": \"10485760\", \"batch.size\": \"163840\",
\"buffer.memory\": \"536870912\", \"max.block.ms\": \"500\", \"retries\": \"3\", \"acks\": \"1\", },
\"cosumer\": {
\"bootstrap.servers\": \"10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093\", \"group.id\": \"test222\",
\"session.timeout.ms\": \"30000\",
\"key.deserializer\": \"org.apache.kafka.common.serialization.StringDeserializer\", \"value.deserializer\": \"org.apache.kafka.common.serialization.StringDeserializer\" }}
2, kafka utils, ⽤来读取kafka的配置信息
package com.wenbronk.kafka;
import com.alibaba.fastjson.JSON;import com.google.gson.JsonElement;import com.google.gson.JsonObject;import com.google.gson.JsonParser;import org.junit.Test;
import java.io.FileNotFoundException;import java.io.FileReader;import java.util.Map;
import java.util.Properties;
public class KafkaUtils { @Test
public void test() throws FileNotFoundException { getConfig(\"producer\");// fastJSON(); }
public static JsonObject getConfig(String name) throws FileNotFoundException { JsonParser parser = new JsonParser();
JsonElement parse = parser.parse(new FileReader(\"src/main/resources/kafka\")); JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name); System.out.println(jsonObject); return jsonObject; }
public static Properties getProperties(String sourceName) throws FileNotFoundException { JsonObject config = KafkaUtils.getConfig(sourceName); Properties properties = new Properties();
for (Map.Entry return properties; } // public static void fastJSON() throws FileNotFoundException { // Object o = JSON.toJSON(new FileReader(\"src/main/resources/kafka\"));// System.out.println(o);// }} 3, kafka producer package com.wenbronk.kafka; import com.google.gson.JsonElement;import com.google.gson.JsonObject; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.junit.Test; import javax.swing.text.StyledEditorKit;import java.io.FileNotFoundException;import java.util.*; import java.util.stream.IntStream;/** * 消息提供者 */ public class KafkaProducerMain { @Test public void send() throws Exception { HashMap map.put(\"http_zhixin\", \"send message to kafka from producer\"); for (int i = 0; i < 3; i++ ) { sendMessage(map); } // sendMessage(map); } /** * 消息发送 */ public void sendMessage(Map KafkaProducer String message = entry.getValue(); ProducerRecord // producer.send(record, new CallBackFuntion(topic, message)); producer.send(record, (recordMetadata, e) -> { if (e != null) { System.err.println(topic + \": \" + message + \"--消息发送失败\"); }else { System.err.println(topic + \": \" + message + \"--消息发送成功\"); } }); } producer.flush(); producer.close(); }} 回掉函数可写匿名内部类, 也可写外部类通过新建的⽅式运⾏ package com.wenbronk.kafka; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata;/** * 回掉函数 */ public class CallBackFuntion implements Callback { private String topic; private String message; public CallBackFuntion(String topic, String message) { this.topic = topic; this.message = message; } @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println(topic + \": \" + message + \"--消息发送失败\"); }else { System.out.println(topic + \": \" + message + \"--消息发送成功\"); } } } 4, kafka consumer package com.wenbronk.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.junit.Test; import java.io.FileNotFoundException;import java.util.*; public class KafkaConsumerMain { /** * ⾃动提交offset */ public void commitAuto(List props.put(\"auto.commit.interval.ms\", \"1000\"); KafkaConsumer ConsumerRecords System.err.printf(\"offset = %d, key = %s, value = %s%n\", record.offset(), record.key(), record.value()); } } /** * ⼿动提交offset * * @throws FileNotFoundException */ public void commitControl(List KafkaConsumer List ConsumerRecords if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); // 阻塞同步提交 consumer.commitSync(); buffer.clear(); } } } /** * ⼿动设置分区 */ public void setOffSet(List KafkaConsumer while (true) { ConsumerRecords for (TopicPartition partition : records.partitions()) { List long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } /** * ⼿动设置消息offset */ public void setSeek(List KafkaConsumer consumer.seek(new TopicPartition(\"http_zhixin\", 0), 797670770); ConsumerRecords for (ConsumerRecord System.err.printf(\"offset = %d, key = %s, value = %s%n\", record.offset(), record.key(), record.value()); consumer.commitSync(); } } @Test public void test() throws FileNotFoundException { ArrayList /** * doSomethings */ private void insertIntoDb(List } kafka 处于同⼀组的消费者, 不可以重复读取消息, 0.11版本中加⼊了事物控制
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- gamedaodao.net 版权所有 湘ICP备2024080961号-6
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务