使用librdkafka的C++接口实现Kafka生产者和消费者客

家电修理 2023-07-16 19:17www.caominkang.com电器维修

1. librdkafka简介

librdkafka 是 Apache Kafka 的 C/C++ 开发包,提供 生产者、消费者 和 管理客户端。

设计理念是可靠以及高性能的消息传输,当前可支持每秒超过100万的消息生产和300万每秒的消息消费。

官方README 文档对librdkafka的介绍
“librdkafka — the Apache Kafka c/C++ client library”

librdkafka/INTRODUCTION.md
https://github./edenhill/librdkafka/blob/master/INTRODUCTION.md

librdkafka/examples/
https://github./edenhill/librdkafka/tree/master/examples

Usage
使用时,需要在源程序中包含包含 "rdkafka.h" 头文件


2. librdkafka的C++接口 2.1 RdKafka::Conf::create()

创建Conf配置实例,用于填充用户指定的各配置项

//namespace RdKafka;

//brief Create configuration object:
//RdKafka::Conf ---> 配置接口类,用来设置对生产者、消费者、broker的各配置项的值
static Conf create(ConfType type);	

enum ConfType {
	CONF_GLOBAL,	//Global configuration
	CONF_TOPIC		//Topic specific configuration
};

使用举例

RdKafka::Conf m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == nullptr) {
	std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
}

RdKafka::Conf m_icConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_config == nullptr) {
	std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
}
2.2 Conf::ConfResult set()

Conf类中的多个set成员函数,用于对不同的配置项进行赋值

class Conf {
public:
	virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb dr_cb, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, EventCb event_cb, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, PartitionerCb partitioner_cb, std::string &errstr);
	//...
	//...
};

enum ConfResult {
	CONF_UNKNOWN = -2,	//Unknon configuration property
	CONF_INVALID = -1,	//Invalid configuration value
	CONF_OK = 0			//Configuration property as suesfully set
};

使用举例

RdKafka::Conf::ConfResult  	result;
std::string					error_str;

RdKafka::Conf m_config;
//设置 "booststrap::servers" 配置项
result = m_config->set("bootstrap.servers", "127.0.0.`:9092", error_str);
if(result != RdKafka::Conf::CONF_OK) {
 std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
}

//设置 "event_cb" 配置项
RdKafka::EventCb m_event_cb = ne ProducerEventCb;
result = m_config->set("event_cb", m_event_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
 std::cout << "Global Conf set 'event_cb' failed: " << error_str << std::endl;
}
2.3 RdKafka::Producer::create()

创建Producer生产者客户端

class Producer : public virtual Handle {
public:
	static Producer create(Conf conf, std::string &errstr);
};

使用举例

RdKfka::Producer m_producer;

m_producer = RdKafka::Producer::create(m_config, error_str);
if(m_producer == nullptr) {
 std::cout << "Create Topic failed: " << error_str << std::endl;
}
2.4 RdKafka::Topic::create()

创建Topic主题对象

class Topic {
public:
	static Topic create(Handle base, const std::string &tipic_str, const Conf conf, std::string &errstr);
};

使用举例

RdKafka::Topic m_ic;

m_ic = RdKafka::Topic::create(m_producer, m_icStr, m_icConfig, error_str);
if(m_ic == nullptr) {
 std::cout << "Create Topic failed: " << error_str << std::endl;
}
2.5 RdKafka::Producer::produce()
class Producer : public virtual Handle {
public:
	virtual ErrorCode produce(Topic ic, int32_t partition, int msgflags, 
						void payload, size_t len, const std::string key, void msg_opaque);

	virtual ErrorCode produce();
};


//Use RdKafka::err2str() to translate an error code a human readable string
enum ErrorCode {
	//Internal errors to rdkafka:
	ERR_BEGIN = -200,		//Begin internal error codes
	ERR_BAD_MSG = -199,		//Received message is incorrect
	//...
	ERR_END = -100,			//End interval error codes

	//Kafka broker errors: 
	ERROR_UNKNOWN = -1,		//Unknon broker error
	ERROR_NO_ERROR = 0,		//Suess
	//...
};

使用举例

RdKafka::ErrorCode error_code = m_producer->produce(m_ic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, 
										payload, len, key, NULL);

m_producer->pool(0);	//poll()参数为0意味着不阻塞,poll(0)主要是为了触发应用程序提供的回调函数

if(error_code != ERROR_NO_ERROR) {
	std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
	if(error_code == ERR_QUEUE_FULL) {
		m_producer->poll(1000);		//如果发送失败的原始是队列正满,则阻塞等待一段时间
	}
	else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
		//如果消息过大超过了max_size,则需要对消息做裁剪后重新发送
	}
	else {
		std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
	}
}
2.6 RdKafka::KafkaConsumer::create()

创建Consumer消费者客户端

class KafkaConsumer : public virtual Handle {
public:
	static KafkaConsumer create(const Conf conf, std::string &errstr);
};

使用举例

RdKafka::KafkaConsumer m_consumer;

m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
if(m_consumer == nullptr) {
 std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
}
2.7 RdKafka::KafkaConsumer::subscribe()

Consumer消费者订阅Topic主题

class KafkaConsumer : public virtual Handle {
public:
	virtual ErrorCode subscribe(const std::vector &ics);
};

使用举例

std::vector ics;
ics.push_back(ic_str);

RdKafka::ErrorCode error_code = m_consumer->subscribe(ics);
if(error_code != ERROR_NO_ERROR) {
	std::cerr << "Consumer Subscribe Topics Failed: " << RdKafka::err2str(error_code) << std::endl;
}
2.8 RdKafka::KafkaConsumer::consume()

Consumer消费者拉取消息进行消费

class KafkaConsumer : public virtual Handle {
public
	virtual Message consume(int timeout_ms);
};

使用举例

RdKafka::Message m_message = m_consumer->consume(5000);	//若超过 5000ms 未订阅到消息,则触发 RdKafka::ERR_TIMED_OUT

2. 使用librdkafka的C++接口实现生产者客户端 2.1 main_producer.cpp
#include "producer_kafka.h"

using namespace std;


int main() {
 KafkaProducer producer("127.0.0.1:9092", "ic-demo", 0);

 sleep(5);

 for(int i = 0; i < 10; i++) {
  char msg[64] = {0};
  sprintf(msg, "%s%4d", "Hello Kafka ", i);   //msg = "Hello Kafka 0001";

  char key[8] = {0};
  sprintf(key, "%d", i);  //key = "1";

  producer.pushMessage(msg, key);
 }

 KafkaProducer::ait_destroyed(50000);

 return 0;
}
2.2 kafka_producer.h
#ifndef __KAFKAPRODUCER_H_
#define __KAFKAPRODUCER_H_

#include 
#include 
#include "rdkafkacpp.h"


class KafkaProducer {
public:
 explicit KafkaProducer(const std::string& brokers, const std::string& ic, int partition);    //epplicit禁止隐式转换,例如不能通过string的构造函数转换出一个broker
 ~KafkaProducer();                            

 void pushMessage(const std::string& msg, const std::string& key);
protected:
 std::string  m_brokers;
 std::string  m_icStr;
 int    m_partition;

 RdKafka::Conf   m_config;     //RdKafka::Conf --- 配置接口类,用来设置对 生产者、消费者、broker的各项配置值
 RdKafka::Conf   m_icConfig;

 RdKafka::Producer  m_producer;
 RdKafka::Topic  m_ic;
 
 RdKafka::DeliveryReportCb   m_dr_cb;   //RdKafka::DeliveryReportCb 用于在调用 RdKafka::Producer::produce() 后返回发送结果,RdKafka::DeliveryReportCb是一个类,需要自行填充其中的回调函数及处理返回结果的方式
 RdKafka::EventCb      m_event_cb;   //RdKafka::EventCb 用于从librdkafka向应用程序传递errors,statistics,logs 等信息的通用接口
 RdKafka::PartitionCb     m_partitioner_cb;   //Rdkafka::PartitionCb 用于设定自定义分区器


};

class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
 void dr_cb(RdKafka::Message &message) {  //重载基类RdKafka::DeliveryReportCb中的虚函数dr_cb()
  if(message.err() != 0) {    //发送出错
   std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
  } else {        //发送成功
   std::cerr << "Message delivered to ic: " << message.ic_name() 
       << " [" << message.partition() 
       << "] at offset " << message.offset() << std::endl;
  }
 }
};
class ProducerEventCb : public RdKafka::EventCb {
public:
 void event_cb(RdKafka::Event &event) {
  sitch(event.type()) {
   case RdKafka::EVENT::EVENT_ERROR:
    std::cout << "RdKafka::EVENT::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
    break;
   case RdKafka::EVENT::EVENT_STATS:
    std::cout << "RdKafka::EVENT::EVENT_STATS: " << events.str() << std::endl;
    break;
   case RdKafka::EVENT::EVENT_LOG:
    std::cout << "RdKafka::EVENT::EVENT_LOG: " << events.fac() << std::endl;
    break;
   case RdKafka::EVENT::EVENT_THROTTLE:
    std::cout << "RdKafka::EVENT::EVENT_THROTTLE: " << event.broker_name() << std::endl;
    break;
  }

 }
}
class HashPartitionerCb : public RdKafka::PartitionerCb {    //自定义生产者分区器,作用就是返回一个分区id。  对key计算Hash值,得到待发送的分区号(其实这跟默认的分区器计算方式是一样的)
public:
 int32_t partitioner_cb( const Topic ic, const std::string key, 
       int32_t partition_t, void msg_opaque) 
 {
  char msg[128] = {0};
  sprintf(smg, "HashPartitionCb:[%s][%s][%d]", ic->name().c_str(), key->c_str(), partition_t);
  std::cout << msg << std::endl; 

  //前面的操作只是为了在分区器回调中打印出一行打印,分区器真正的操作是在下面generate_hash,生成一个待发送的分区ID
  return generate_hash(key->c_str(), key->size()) % partition_t; 
 }
private:
 static inline unsigned int generate_hash(const char str, size_t len) {
  unsigned int hash = 5381;
  for (size_t i = 0; i < len; i++) {
   hash = ( (hash << 5) + hash ) + str[i];
  }
  return hash; //返回值必须在 0 到 partition_t 之间。如果出错则发回 PARTITION_UA(-1)
 }
};


#endif
2.3 producer_kafka.cpp
#include "producer_kafka.h"


//("192.168.0.105:9092", "ic_demo", 0)
KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& ic, int partition) {
 m_brokers   = brokers;
 m_icStr  = ic;
 m_partition = partition;

 //先填充构造生产者客户端的参数配置
 m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
 if(m_config == nullptr) {
  std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
 }

 m_icConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
 if(m_icConfig == nullptr) {
  std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
 }

 //下面开始配置各种需要的配置项
 RdKafka::Conf::ConfResult   result;
 std::string     error_str;
 
 result = m_config->set("booststrap.servers", m_brokers, error_str); //设置生产者待发送服务器的地址: "ip:port" 格式
 if(result != RdKafka::Conf::CONF_OK) {
  std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
 }

 result = m_config->set("statistics.interval.ms", "10000", error_str);
 if(result != RdKafka::Conf::CONF_OK) {
  std::cout << "Global Conf set ‘statistics.interval.ms’ failed: " << error_str << std::endl;
 }

 result = m_config->set("message.max.bytes", "10240000", error_str);  //设置发送端发送的最大字节数,如果发送的消息过大则返回失败
 if(result != RdKafka::Conf::CONF_OK) {
  std::cout << "Global Conf set 'message.max.bytes' failed: " << error_str << std::endl;
 }


 m_dr_cb = ne ProducerDeliveryReportCb;
 result = m_config->set("dr_cb", m_dr_cb, error_str); //设置每个消息发送后的发送结果回调
 if(result != RdKafka::Conf::CONF_OK) {
  std::cout << "Global Conf set ‘dr_cb’ failed: " << error_str << std::endl;
 }

 m_event_cb = ne ProducerEventCb;
 result = m_config->set("event_cb", m_event_cb, error_str);
 if(result != RdKafka::Conf::CONF_OK) {
  std::cout << "Global Conf set ‘event_cb’ failed: " << error_str << std::endl;
 }

 m_partitioner_cb = ne HashPartitionerCb;
 result = m_icConfig->set("partitioner_cb", m_partitioner_cb, error_str);  //设置自定义分区器
 if(result != RdKafka::Conf::CONF_OK) {
  std::cout << "Topic Conf set ‘partitioner_cb’ failed: " << error_str << std::endl;
 }

 //创建Producer生产者客户端
 m_producer = RdKafka::Producer::create(m_config, error_str); //RdKafka::Producer::create(const RdKafka::Conf conf, std::string &errstr);
 if(m_producer == nullptr) {
  std::cout << "Create Producer failed: " << error_str << std::endl;
 }

 //创建Topic对象,后续produce发送消息时需要使用
 m_ic = RdKafka::Topic::create(m_producer, m_icStr, m_icConfig, error_str); //RdKafka::Topic::create(Hanle base, const std::string &ic_str, const Conf conf, std::string &errstr);
 if(m_ic == nullptr) {
  std::cout << "Create Topic failed: " << error_str << std::endl;
 }
}

void KafkaProducer::pushMessage(const std::string& msg, const std::string& key) {
 int32_t len = str.length();
 void payload = const_cast(static_cast(str.data()));

 RdKafka::ErrorCode error_code = m_producer->prodce( m_ic, RdKafka::Topic::PARTITION_UA,
              RdKafka::Producer::RK_MSG_COPY,
              payload, len, key, NULL);
 m_producer->poll(0);  //poll()参数为0意味着不阻塞;poll(0)主要是为了触发应用程序提供的回调函数
 if(error_code != ERR_NO_ERROR) {
  std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
  if(error_code == ERR_QUEUE_FULL) {
   m_producer->poll(1000);  //如果发送失败的原因是队列正满,则阻塞等待一段时间
  } else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
   //如果发送消息过大,超过了max.size,则需要裁减后重新发送
  } else {
   std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
  }
 }
}
KafkaProducer::~KafkaProducer() {
 hile(m_producer->outq_len() > 0) {   //当 Handle->outq_len() 客户端的“出队列” 的长度大于0
  std::cerr << "Waiting for: " << m_producer->outq_len() << std::endl;
  m_producer->flush(5000);
 }

 delete m_config;
 delete m_icConfig;
 delete m_ic;
 delete m_producer;
 delete m_dr_cb;
 delete m_event_cb;
 delete m_partitioner_cb;
}
3. 使用librdkafka的C++接口实现实现消费者客户端 3.1 main_consumer.cpp
#include "kafka_consumer.h"

int main()
{
 std::string brokers = "127.0.0.1:9092";
 
 std::vector ics;		//待消费主题的集合
 ics.push_back("ic-demo");

 std::string group = "consumer-group-demo";	//消费组
 
 KafkaConsumer consumer(brokers, group, ics, RdKafka::Topic::OFFSET_BEGINNING);
 
 consumer.pullMessage();

 RdKafka::ait_destroyed(5000);
 
 return 0;
}


3.2 kafka_consumer.h
#ifndef __KAFKACONSUMER_H_
#define __KAFKACONSUMER_H_

#include 
#include 
#include 
#include 
#include "rdkafkacpp.h"

class KafkaConsumer {
public:
 explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
         const std::vector& ics, int partition);
 ~KafkaConsumer();
 
 void pullMessage();
 
protected:
 std::string 				m_brokers;
 std::string 				m_groupId;
 std::vector 	m_icVector;		//一个消费者可以订阅多个主题,所有用vector
 int 						m_partition;
 
 RdKafka::Conf 				m_config;			//GLOBAL 级别的配置(Consumer客户端级别)
 RdKafka::Conf 				m_icConfig;		//TOPIC	级别的配置
 
 RdKafka::KafkaConsumer 	m_consumer;			//消费者客户端实例
 
 RdKafka::EventCb 			m_event_cb;			//Event事件回调
 RdKafka::RebalanceCb 		m_rebalance_cb;		//再均衡 回调
};


class ConsumerEventCb : public RdKafka::EventCb {
public:
 void event_cb(RdKafka::Event& event) {
  sitch (event.type())
  {
	  case RdKafka::Event::EVENT_ERROR:
	   if (event.fatal()) 				//判断是否为FATAL错误
	    std::cerr << "FATAL ";
	   std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
	   break;
	  case RdKafka::Event::EVENT_STATS:
	   std::cerr << ""STATS": " << event.str() << std::endl;
	   break;
	  case RdKafka::Event::EVENT_LOG:
	   fprintf(stderr, "LOG-%i-%s: %sn", event.severity(), event.fac().c_str(), event.str().c_str());
	   break;
	  case RdKafka::Event::EVENT_THROTTLE:
	   std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl;
	   break;
	  default:
	   std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " <<  event.str() << std::endl;
	   break;
  }
 }
};

class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
public:
 void rebalance_cb( RdKafka::KafkaConsumer consumer, RdKafka::ErrorCode err,
        std::vector &partitions)		//Kafka服务端通过 err参数传入再均衡的具体事件(发生前、发生后),通过partitions参数传入再均衡 前/后,旧的/新的 分区信息
 {
  std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": " << printTopicPartition(partitions);
  
  if(err == RdKafka::ERR__ASSIGN_PARTITIONS) {		//ERR__ASSIGN_PARTITIONS: 表示“再均衡发生之后,消费者开始消费之前”,此时消费者客户端可以从broker上重新加载offset
   consumer->assign(partitions);					//再均衡后,重新 assign() 订阅这些分区
   partition_count = (int)partitions.size();
  } else if(err == RdKafka::ERR__REVOKE_PARTITIONS) {		//ERR__REVOKE_PARTITIONS: 表示“消费者停止消费之后,再均衡发生之前”,此时应用程序可以在这里提交 offset
   consumer->unassign();								//再均衡前,unassign() 退订这些分区
   partition_count = 0;								//退订所有分区后,清0
  } else {
			std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
		}
 }
 
private:
 static void printTopicPartition(const std::vector &partitions) {	//打印出所有的主题、分区信息
  for(unsigned int i = 0 ; i < partitions.size() ; i++) {
   std::cerr << partitions[i]->ic() << "[" << partitions[i]->partition() << "], ";
  }
  std::cerr << "n";
 }
private:
 int partition_count;			//保存consumer消费者客户端 当前订阅的分区数
};
#endif
3.3 kafka_consumer.cpp
#include "kafka_consumer.h"

KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupId, 
							 const std::vector& ics, int partition) 
{
	m_brokers		= borker;
	m_groupId		= groupId;
	m_icVector	= ics;
	m_partition		= partition;

	//创建Conf实例
	m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	if(m_config == nullptr) {
		std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
	}

	m_icConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
	if(m_icConfig == nullptr) {
		std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
	}

	//设置Conf的各个配置参数
	RdKafka::Conf::ConfResult   result;
 std::string     error_str;

 result = m_config->set("bootstrap.servers", m_brokers, error_str);
 if(result != RdKafka::Conf::CONF_OK) {
 	std::cout << "Conf set 'bootstrap.servers' failed: " << error_str << std::endl;
 }

 result = m_config->set("group.id", m_groupId, error_str);		//设置消费组名group.id(string类型)
 if(result != RdKafka::Conf::CONF_OK) {
 	std::cout << "Conf set 'group.id' failed: " << error_str << std::endl;
 }

 result = m_config->set("max.partition.fetch.bytes", "1024000", error_str);	//消费消息的最大大小
 if(result != RdKafka::Conf::CONF_OK) {
 	std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << error_str << std::endl;
 }

 result = m_config->set("enable.partition.eof", "false", error_str);		//enable.partition.eof: 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件,默认值 true
 if(result != RdKafka::Conf::CONF_OK) {
 	std::cout << "Conf set 'enable.partition.eof' failed: " << error_str << std::endl;
 }


 m_event_cb = ne ConsumerEventCb;
 result = m_config->set("event_cb", m_event_cb, error_str);
 if(result != RdKafka::Conf::CONF_OK) {
 	std::cout << "Conf set 'event_cb' failed: " << error_str << std::endl;
 }

 m_reblance_cb = ne ConsumerRebalanceCb;
 result = m_config->set("rebalance_cb", m_reblance_cb, error_str);
 if(result != RdKafka::Conf::CONF_OK) {
 	std::cout << "Conf set 'rebalance_cb' failed: " << error_str << std::endl;
 }


 //设置 ic_conf的配置项
 result = m_icConfig->set("auto.offset.reset", "latest", error_str);
 if(result != RdKafka::Conf::CONF_OK)
 {
  std::cout << "Topic Conf set 'auto.offset.reset' failed: " << error_str << std::endl;
 }

 result = m_config->set("default_ic_conf", m_icConfig, error_str);
 if(result != RdKafka::Conf::CONF_OK)
 {
  std::cout << "Conf set 'default_ic_conf' failed: " << error_str << std::endl;
 }


 //创建消费者客户端
 m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
 if(m_consumer == nullptr) {
 	std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
 }
 std::cout << "Create KafkaConsumer sueed, consumer name : " << m_consumer->name() << std::endl;
}


void RdKafkaConsumer::pullMessage() {

}

Copyright © 2016-2025 www.caominkang.com 曹敏电脑维修网 版权所有 Power by