rdkafkacpp.h 72 KB


  1. /*
  2. * librdkafka - Apache Kafka C/C++ library
  3. *
  4. * Copyright (c) 2014 Magnus Edenhill
  5. * All rights reserved.
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. *
  10. * 1. Redistributions of source code must retain the above copyright notice,
  11. * this list of conditions and the following disclaimer.
  12. * 2. Redistributions in binary form must reproduce the above copyright notice,
  13. * this list of conditions and the following disclaimer in the documentation
  14. * and/or other materials provided with the distribution.
  15. *
  16. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  17. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  18. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  19. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  20. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  21. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  22. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  23. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  24. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  25. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  26. * POSSIBILITY OF SUCH DAMAGE.
  27. */
  28. #ifndef _RDKAFKACPP_H_
  29. #define _RDKAFKACPP_H_
  30. /**
  31. * @file rdkafkacpp.h
  32. * @brief Apache Kafka C/C++ consumer and producer client library.
  33. *
  34. * rdkafkacpp.h contains the public C++ API for librdkafka.
  35. * The API is documented in this file as comments prefixing the class,
  36. * function, type, enum, define, etc.
  37. * For more information, see the C interface in rdkafka.h and read the
  38. * manual in INTRODUCTION.md.
  39. * The C++ interface is STD C++ '03 compliant and adheres to the
  40. * Google C++ Style Guide.
  41. * @sa For the C interface see rdkafka.h
  42. *
  43. * @tableofcontents
  44. */
  45. /**@cond NO_DOC*/
  46. #include <string>
  47. #include <list>
  48. #include <vector>
  49. #include <stdint.h>
  50. #ifdef _MSC_VER
  51. #undef RD_EXPORT
  52. #ifdef LIBRDKAFKA_STATICLIB
  53. #define RD_EXPORT
  54. #else
  55. #ifdef LIBRDKAFKACPP_EXPORTS
  56. #define RD_EXPORT __declspec(dllexport)
  57. #else
  58. #define RD_EXPORT __declspec(dllimport)
  59. #endif
  60. #endif
  61. #else
  62. #define RD_EXPORT
  63. #endif
  64. /**@endcond*/
  65. extern "C" {
  66. /* Forward declarations */
  67. struct rd_kafka_s;
  68. struct rd_kafka_topic_s;
  69. struct rd_kafka_message_s;
  70. };
  71. namespace RdKafka {
  72. /**
  73. * @name Miscellaneous APIs
  74. * @{
  75. */
  76. /**
  77. * @brief librdkafka version
  78. *
  79. * Interpreted as hex \c MM.mm.rr.xx:
  80. * - MM = Major
  81. * - mm = minor
  82. * - rr = revision
  83. * - xx = pre-release id (0xff is the final release)
  84. *
  85. * E.g.: \c 0x000801ff = 0.8.1
  86. *
  87. * @remark This value should only be used during compile time,
  88. * for runtime checks of version use RdKafka::version()
  89. */
  90. #define RD_KAFKA_VERSION 0x000b04ff
  91. /**
  92. * @brief Returns the librdkafka version as integer.
  93. *
  94. * @sa See RD_KAFKA_VERSION for how to parse the integer format.
  95. */
  96. RD_EXPORT
  97. int version ();
  98. /**
  99. * @brief Returns the librdkafka version as string.
  100. */
  101. RD_EXPORT
  102. std::string version_str();
  103. /**
  104. * @brief Returns a CSV list of the supported debug contexts
  105. * for use with Conf::Set("debug", ..).
  106. */
  107. RD_EXPORT
  108. std::string get_debug_contexts();
  109. /**
  110. * @brief Wait for all rd_kafka_t objects to be destroyed.
  111. *
  112. * @returns 0 if all kafka objects are now destroyed, or -1 if the
  113. * timeout was reached.
  114. * Since RdKafka handle deletion is an asynch operation the
  115. * \p wait_destroyed() function can be used for applications where
  116. * a clean shutdown is required.
  117. */
  118. RD_EXPORT
  119. int wait_destroyed(int timeout_ms);
  120. /**@}*/
  121. /**
  122. * @name Constants, errors, types
  123. * @{
  124. *
  125. *
  126. */
  127. /**
  128. * @brief Error codes.
  129. *
  130. * The negative error codes delimited by two underscores
  131. * (\c _ERR__..) denotes errors internal to librdkafka and are
  132. * displayed as \c \"Local: \<error string..\>\", while the error codes
  133. * delimited by a single underscore (\c ERR_..) denote broker
  134. * errors and are displayed as \c \"Broker: \<error string..\>\".
  135. *
  136. * @sa Use RdKafka::err2str() to translate an error code a human readable string
  137. */
  138. enum ErrorCode {
  139. /* Internal errors to rdkafka: */
  140. /** Begin internal error codes */
  141. ERR__BEGIN = -200,
  142. /** Received message is incorrect */
  143. ERR__BAD_MSG = -199,
  144. /** Bad/unknown compression */
  145. ERR__BAD_COMPRESSION = -198,
  146. /** Broker is going away */
  147. ERR__DESTROY = -197,
  148. /** Generic failure */
  149. ERR__FAIL = -196,
  150. /** Broker transport failure */
  151. ERR__TRANSPORT = -195,
  152. /** Critical system resource */
  153. ERR__CRIT_SYS_RESOURCE = -194,
  154. /** Failed to resolve broker */
  155. ERR__RESOLVE = -193,
  156. /** Produced message timed out*/
  157. ERR__MSG_TIMED_OUT = -192,
  158. /** Reached the end of the topic+partition queue on
  159. * the broker. Not really an error. */
  160. ERR__PARTITION_EOF = -191,
  161. /** Permanent: Partition does not exist in cluster. */
  162. ERR__UNKNOWN_PARTITION = -190,
  163. /** File or filesystem error */
  164. ERR__FS = -189,
  165. /** Permanent: Topic does not exist in cluster. */
  166. ERR__UNKNOWN_TOPIC = -188,
  167. /** All broker connections are down. */
  168. ERR__ALL_BROKERS_DOWN = -187,
  169. /** Invalid argument, or invalid configuration */
  170. ERR__INVALID_ARG = -186,
  171. /** Operation timed out */
  172. ERR__TIMED_OUT = -185,
  173. /** Queue is full */
  174. ERR__QUEUE_FULL = -184,
  175. /** ISR count < required.acks */
  176. ERR__ISR_INSUFF = -183,
  177. /** Broker node update */
  178. ERR__NODE_UPDATE = -182,
  179. /** SSL error */
  180. ERR__SSL = -181,
  181. /** Waiting for coordinator to become available. */
  182. ERR__WAIT_COORD = -180,
  183. /** Unknown client group */
  184. ERR__UNKNOWN_GROUP = -179,
  185. /** Operation in progress */
  186. ERR__IN_PROGRESS = -178,
  187. /** Previous operation in progress, wait for it to finish. */
  188. ERR__PREV_IN_PROGRESS = -177,
  189. /** This operation would interfere with an existing subscription */
  190. ERR__EXISTING_SUBSCRIPTION = -176,
  191. /** Assigned partitions (rebalance_cb) */
  192. ERR__ASSIGN_PARTITIONS = -175,
  193. /** Revoked partitions (rebalance_cb) */
  194. ERR__REVOKE_PARTITIONS = -174,
  195. /** Conflicting use */
  196. ERR__CONFLICT = -173,
  197. /** Wrong state */
  198. ERR__STATE = -172,
  199. /** Unknown protocol */
  200. ERR__UNKNOWN_PROTOCOL = -171,
  201. /** Not implemented */
  202. ERR__NOT_IMPLEMENTED = -170,
  203. /** Authentication failure*/
  204. ERR__AUTHENTICATION = -169,
  205. /** No stored offset */
  206. ERR__NO_OFFSET = -168,
  207. /** Outdated */
  208. ERR__OUTDATED = -167,
  209. /** Timed out in queue */
  210. ERR__TIMED_OUT_QUEUE = -166,
  211. /** Feature not supported by broker */
  212. ERR__UNSUPPORTED_FEATURE = -165,
  213. /** Awaiting cache update */
  214. ERR__WAIT_CACHE = -164,
  215. /** Operation interrupted */
  216. ERR__INTR = -163,
  217. /** Key serialization error */
  218. ERR__KEY_SERIALIZATION = -162,
  219. /** Value serialization error */
  220. ERR__VALUE_SERIALIZATION = -161,
  221. /** Key deserialization error */
  222. ERR__KEY_DESERIALIZATION = -160,
  223. /** Value deserialization error */
  224. ERR__VALUE_DESERIALIZATION = -159,
  225. /** Partial response */
  226. ERR__PARTIAL = -158,
  227. /** Modification attempted on read-only object */
  228. ERR__READ_ONLY = -157,
  229. /** No such entry / item not found */
  230. ERR__NOENT = -156,
  231. /** Read underflow */
  232. ERR__UNDERFLOW = -155,
  233. /** End internal error codes */
  234. ERR__END = -100,
  235. /* Kafka broker errors: */
  236. /** Unknown broker error */
  237. ERR_UNKNOWN = -1,
  238. /** Success */
  239. ERR_NO_ERROR = 0,
  240. /** Offset out of range */
  241. ERR_OFFSET_OUT_OF_RANGE = 1,
  242. /** Invalid message */
  243. ERR_INVALID_MSG = 2,
  244. /** Unknown topic or partition */
  245. ERR_UNKNOWN_TOPIC_OR_PART = 3,
  246. /** Invalid message size */
  247. ERR_INVALID_MSG_SIZE = 4,
  248. /** Leader not available */
  249. ERR_LEADER_NOT_AVAILABLE = 5,
  250. /** Not leader for partition */
  251. ERR_NOT_LEADER_FOR_PARTITION = 6,
  252. /** Request timed out */
  253. ERR_REQUEST_TIMED_OUT = 7,
  254. /** Broker not available */
  255. ERR_BROKER_NOT_AVAILABLE = 8,
  256. /** Replica not available */
  257. ERR_REPLICA_NOT_AVAILABLE = 9,
  258. /** Message size too large */
  259. ERR_MSG_SIZE_TOO_LARGE = 10,
  260. /** StaleControllerEpochCode */
  261. ERR_STALE_CTRL_EPOCH = 11,
  262. /** Offset metadata string too large */
  263. ERR_OFFSET_METADATA_TOO_LARGE = 12,
  264. /** Broker disconnected before response received */
  265. ERR_NETWORK_EXCEPTION = 13,
  266. /** Group coordinator load in progress */
  267. ERR_GROUP_LOAD_IN_PROGRESS = 14,
  268. /** Group coordinator not available */
  269. ERR_GROUP_COORDINATOR_NOT_AVAILABLE = 15,
  270. /** Not coordinator for group */
  271. ERR_NOT_COORDINATOR_FOR_GROUP = 16,
  272. /** Invalid topic */
  273. ERR_TOPIC_EXCEPTION = 17,
  274. /** Message batch larger than configured server segment size */
  275. ERR_RECORD_LIST_TOO_LARGE = 18,
  276. /** Not enough in-sync replicas */
  277. ERR_NOT_ENOUGH_REPLICAS = 19,
  278. /** Message(s) written to insufficient number of in-sync replicas */
  279. ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20,
  280. /** Invalid required acks value */
  281. ERR_INVALID_REQUIRED_ACKS = 21,
  282. /** Specified group generation id is not valid */
  283. ERR_ILLEGAL_GENERATION = 22,
  284. /** Inconsistent group protocol */
  285. ERR_INCONSISTENT_GROUP_PROTOCOL = 23,
  286. /** Invalid group.id */
  287. ERR_INVALID_GROUP_ID = 24,
  288. /** Unknown member */
  289. ERR_UNKNOWN_MEMBER_ID = 25,
  290. /** Invalid session timeout */
  291. ERR_INVALID_SESSION_TIMEOUT = 26,
  292. /** Group rebalance in progress */
  293. ERR_REBALANCE_IN_PROGRESS = 27,
  294. /** Commit offset data size is not valid */
  295. ERR_INVALID_COMMIT_OFFSET_SIZE = 28,
  296. /** Topic authorization failed */
  297. ERR_TOPIC_AUTHORIZATION_FAILED = 29,
  298. /** Group authorization failed */
  299. ERR_GROUP_AUTHORIZATION_FAILED = 30,
  300. /** Cluster authorization failed */
  301. ERR_CLUSTER_AUTHORIZATION_FAILED = 31,
  302. /** Invalid timestamp */
  303. ERR_INVALID_TIMESTAMP = 32,
  304. /** Unsupported SASL mechanism */
  305. ERR_UNSUPPORTED_SASL_MECHANISM = 33,
  306. /** Illegal SASL state */
  307. ERR_ILLEGAL_SASL_STATE = 34,
  308. /** Unuspported version */
  309. ERR_UNSUPPORTED_VERSION = 35,
  310. /** Topic already exists */
  311. ERR_TOPIC_ALREADY_EXISTS = 36,
  312. /** Invalid number of partitions */
  313. ERR_INVALID_PARTITIONS = 37,
  314. /** Invalid replication factor */
  315. ERR_INVALID_REPLICATION_FACTOR = 38,
  316. /** Invalid replica assignment */
  317. ERR_INVALID_REPLICA_ASSIGNMENT = 39,
  318. /** Invalid config */
  319. ERR_INVALID_CONFIG = 40,
  320. /** Not controller for cluster */
  321. ERR_NOT_CONTROLLER = 41,
  322. /** Invalid request */
  323. ERR_INVALID_REQUEST = 42,
  324. /** Message format on broker does not support request */
  325. ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT = 43,
  326. /** Isolation policy volation */
  327. ERR_POLICY_VIOLATION = 44,
  328. /** Broker received an out of order sequence number */
  329. ERR_OUT_OF_ORDER_SEQUENCE_NUMBER = 45,
  330. /** Broker received a duplicate sequence number */
  331. ERR_DUPLICATE_SEQUENCE_NUMBER = 46,
  332. /** Producer attempted an operation with an old epoch */
  333. ERR_INVALID_PRODUCER_EPOCH = 47,
  334. /** Producer attempted a transactional operation in an invalid state */
  335. ERR_INVALID_TXN_STATE = 48,
  336. /** Producer attempted to use a producer id which is not
  337. * currently assigned to its transactional id */
  338. ERR_INVALID_PRODUCER_ID_MAPPING = 49,
  339. /** Transaction timeout is larger than the maximum
  340. * value allowed by the broker's max.transaction.timeout.ms */
  341. ERR_INVALID_TRANSACTION_TIMEOUT = 50,
  342. /** Producer attempted to update a transaction while another
  343. * concurrent operation on the same transaction was ongoing */
  344. ERR_CONCURRENT_TRANSACTIONS = 51,
  345. /** Indicates that the transaction coordinator sending a
  346. * WriteTxnMarker is no longer the current coordinator for a
  347. * given producer */
  348. ERR_TRANSACTION_COORDINATOR_FENCED = 52,
  349. /** Transactional Id authorization failed */
  350. ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53,
  351. /** Security features are disabled */
  352. ERR_SECURITY_DISABLED = 54,
  353. /** Operation not attempted */
  354. ERR_OPERATION_NOT_ATTEMPTED = 55
  355. };
  356. /**
  357. * @brief Returns a human readable representation of a kafka error.
  358. */
  359. RD_EXPORT
  360. std::string err2str(RdKafka::ErrorCode err);
  361. /**@} */
  362. /**@cond NO_DOC*/
  363. /* Forward declarations */
  364. class Producer;
  365. class Message;
  366. class Queue;
  367. class Event;
  368. class Topic;
  369. class TopicPartition;
  370. class Metadata;
  371. class KafkaConsumer;
  372. /**@endcond*/
  373. /**
  374. * @name Callback classes
  375. * @{
  376. *
  377. *
  378. * librdkafka uses (optional) callbacks to propagate information and
  379. * delegate decisions to the application logic.
  380. *
  381. * An application must call RdKafka::poll() at regular intervals to
  382. * serve queued callbacks.
  383. */
  384. /**
  385. * @brief Delivery Report callback class
  386. *
  387. * The delivery report callback will be called once for each message
  388. * accepted by RdKafka::Producer::produce() (et.al) with
  389. * RdKafka::Message::err() set to indicate the result of the produce request.
  390. *
  391. * The callback is called when a message is succesfully produced or
  392. * if librdkafka encountered a permanent failure, or the retry counter for
  393. * temporary errors has been exhausted.
  394. *
  395. * An application must call RdKafka::poll() at regular intervals to
  396. * serve queued delivery report callbacks.
  397. */
  398. class RD_EXPORT DeliveryReportCb {
  399. public:
  400. /**
  401. * @brief Delivery report callback.
  402. */
  403. virtual void dr_cb (Message &message) = 0;
  404. virtual ~DeliveryReportCb() { }
  405. };
  406. /**
  407. * @brief Partitioner callback class
  408. *
  409. * Generic partitioner callback class for implementing custom partitioners.
  410. *
  411. * @sa RdKafka::Conf::set() \c "partitioner_cb"
  412. */
  413. class RD_EXPORT PartitionerCb {
  414. public:
  415. /**
  416. * @brief Partitioner callback
  417. *
  418. * Return the partition to use for \p key in \p topic.
  419. *
  420. * The \p msg_opaque is the same \p msg_opaque provided in the
  421. * RdKafka::Producer::produce() call.
  422. *
  423. * @remark \p key may be NULL or the empty.
  424. *
  425. * @returns Must return a value between 0 and \p partition_cnt (non-inclusive).
  426. * May return RD_KAFKA_PARTITION_UA (-1) if partitioning failed.
  427. *
  428. * @sa The callback may use RdKafka::Topic::partition_available() to check
  429. * if a partition has an active leader broker.
  430. */
  431. virtual int32_t partitioner_cb (const Topic *topic,
  432. const std::string *key,
  433. int32_t partition_cnt,
  434. void *msg_opaque) = 0;
  435. virtual ~PartitionerCb() { }
  436. };
  437. /**
  438. * @brief Variant partitioner with key pointer
  439. *
  440. */
  441. class PartitionerKeyPointerCb {
  442. public:
  443. /**
  444. * @brief Variant partitioner callback that gets \p key as pointer and length
  445. * instead of as a const std::string *.
  446. *
  447. * @remark \p key may be NULL or have \p key_len 0.
  448. *
  449. * @sa See RdKafka::PartitionerCb::partitioner_cb() for exact semantics
  450. */
  451. virtual int32_t partitioner_cb (const Topic *topic,
  452. const void *key,
  453. size_t key_len,
  454. int32_t partition_cnt,
  455. void *msg_opaque) = 0;
  456. virtual ~PartitionerKeyPointerCb() { }
  457. };
  458. /**
  459. * @brief Event callback class
  460. *
  461. * Events are a generic interface for propagating errors, statistics, logs, etc
  462. * from librdkafka to the application.
  463. *
  464. * @sa RdKafka::Event
  465. */
  466. class RD_EXPORT EventCb {
  467. public:
  468. /**
  469. * @brief Event callback
  470. *
  471. * @sa RdKafka::Event
  472. */
  473. virtual void event_cb (Event &event) = 0;
  474. virtual ~EventCb() { }
  475. };
  476. /**
  477. * @brief Event object class as passed to the EventCb callback.
  478. */
  479. class RD_EXPORT Event {
  480. public:
  481. /** @brief Event type */
  482. enum Type {
  483. EVENT_ERROR, /**< Event is an error condition */
  484. EVENT_STATS, /**< Event is a statistics JSON document */
  485. EVENT_LOG, /**< Event is a log message */
  486. EVENT_THROTTLE /**< Event is a throttle level signaling from the broker */
  487. };
  488. /** @brief EVENT_LOG severities (conforms to syslog(3) severities) */
  489. enum Severity {
  490. EVENT_SEVERITY_EMERG = 0,
  491. EVENT_SEVERITY_ALERT = 1,
  492. EVENT_SEVERITY_CRITICAL = 2,
  493. EVENT_SEVERITY_ERROR = 3,
  494. EVENT_SEVERITY_WARNING = 4,
  495. EVENT_SEVERITY_NOTICE = 5,
  496. EVENT_SEVERITY_INFO = 6,
  497. EVENT_SEVERITY_DEBUG = 7
  498. };
  499. virtual ~Event () { }
  500. /*
  501. * Event Accessor methods
  502. */
  503. /**
  504. * @returns The event type
  505. * @remark Applies to all event types
  506. */
  507. virtual Type type () const = 0;
  508. /**
  509. * @returns Event error, if any.
  510. * @remark Applies to all event types except THROTTLE
  511. */
  512. virtual ErrorCode err () const = 0;
  513. /**
  514. * @returns Log severity level.
  515. * @remark Applies to LOG event type.
  516. */
  517. virtual Severity severity () const = 0;
  518. /**
  519. * @returns Log facility string.
  520. * @remark Applies to LOG event type.
  521. */
  522. virtual std::string fac () const = 0;
  523. /**
  524. * @returns Log message string.
  525. *
  526. * \c EVENT_LOG: Log message string.
  527. * \c EVENT_STATS: JSON object (as string).
  528. *
  529. * @remark Applies to LOG event type.
  530. */
  531. virtual std::string str () const = 0;
  532. /**
  533. * @returns Throttle time in milliseconds.
  534. * @remark Applies to THROTTLE event type.
  535. */
  536. virtual int throttle_time () const = 0;
  537. /**
  538. * @returns Throttling broker's name.
  539. * @remark Applies to THROTTLE event type.
  540. */
  541. virtual std::string broker_name () const = 0;
  542. /**
  543. * @returns Throttling broker's id.
  544. * @remark Applies to THROTTLE event type.
  545. */
  546. virtual int broker_id () const = 0;
  547. };
  548. /**
  549. * @brief Consume callback class
  550. */
  551. class RD_EXPORT ConsumeCb {
  552. public:
  553. /**
  554. * @brief The consume callback is used with
  555. * RdKafka::Consumer::consume_callback()
  556. * methods and will be called for each consumed \p message.
  557. *
  558. * The callback interface is optional but provides increased performance.
  559. */
  560. virtual void consume_cb (Message &message, void *opaque) = 0;
  561. virtual ~ConsumeCb() { }
  562. };
  563. /**
  564. * @brief \b KafkaConsunmer: Rebalance callback class
  565. */
  566. class RD_EXPORT RebalanceCb {
  567. public:
  568. /**
  569. * @brief Group rebalance callback for use with RdKafka::KafkaConsunmer
  570. *
  571. * Registering a \p rebalance_cb turns off librdkafka's automatic
  572. * partition assignment/revocation and instead delegates that responsibility
  573. * to the application's \p rebalance_cb.
  574. *
  575. * The rebalance callback is responsible for updating librdkafka's
  576. * assignment set based on the two events: RdKafka::ERR__ASSIGN_PARTITIONS
  577. * and RdKafka::ERR__REVOKE_PARTITIONS but should also be able to handle
  578. * arbitrary rebalancing failures where \p err is neither of those.
  579. * @remark In this latter case (arbitrary error), the application must
  580. * call unassign() to synchronize state.
  581. *
  582. * Without a rebalance callback this is done automatically by librdkafka
  583. * but registering a rebalance callback gives the application flexibility
  584. * in performing other operations along with the assinging/revocation,
  585. * such as fetching offsets from an alternate location (on assign)
  586. * or manually committing offsets (on revoke).
  587. *
  588. * The following example show's the application's responsibilities:
  589. * @code
  590. * class MyRebalanceCb : public RdKafka::RebalanceCb {
  591. * public:
  592. * void rebalance_cb (RdKafka::KafkaConsumer *consumer,
  593. * RdKafka::ErrorCode err,
  594. * std::vector<RdKafka::TopicPartition*> &partitions) {
  595. * if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
  596. * // application may load offets from arbitrary external
  597. * // storage here and update \p partitions
  598. *
  599. * consumer->assign(partitions);
  600. *
  601. * } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) {
  602. * // Application may commit offsets manually here
  603. * // if auto.commit.enable=false
  604. *
  605. * consumer->unassign();
  606. *
  607. * } else {
  608. * std::cerr << "Rebalancing error: <<
  609. * RdKafka::err2str(err) << std::endl;
  610. * consumer->unassign();
  611. * }
  612. * }
  613. * }
  614. * @endcode
  615. */
  616. virtual void rebalance_cb (RdKafka::KafkaConsumer *consumer,
  617. RdKafka::ErrorCode err,
  618. std::vector<TopicPartition*>&partitions) = 0;
  619. virtual ~RebalanceCb() { }
  620. };
  621. /**
  622. * @brief Offset Commit callback class
  623. */
  624. class RD_EXPORT OffsetCommitCb {
  625. public:
  626. /**
  627. * @brief Set offset commit callback for use with consumer groups
  628. *
  629. * The results of automatic or manual offset commits will be scheduled
  630. * for this callback and is served by RdKafka::KafkaConsumer::consume().
  631. *
  632. * If no partitions had valid offsets to commit this callback will be called
  633. * with \p err == ERR__NO_OFFSET which is not to be considered an error.
  634. *
  635. * The \p offsets list contains per-partition information:
  636. * - \c topic The topic committed
  637. * - \c partition The partition committed
  638. * - \c offset: Committed offset (attempted)
  639. * - \c err: Commit error
  640. */
  641. virtual void offset_commit_cb(RdKafka::ErrorCode err,
  642. std::vector<TopicPartition*>&offsets) = 0;
  643. virtual ~OffsetCommitCb() { }
  644. };
  645. /**
  646. * @brief \b Portability: SocketCb callback class
  647. *
  648. */
  649. class RD_EXPORT SocketCb {
  650. public:
  651. /**
  652. * @brief Socket callback
  653. *
  654. * The socket callback is responsible for opening a socket
  655. * according to the supplied \p domain, \p type and \p protocol.
  656. * The socket shall be created with \c CLOEXEC set in a racefree fashion, if
  657. * possible.
  658. *
  659. * It is typically not required to register an alternative socket
  660. * implementation
  661. *
  662. * @returns The socket file descriptor or -1 on error (\c errno must be set)
  663. */
  664. virtual int socket_cb (int domain, int type, int protocol) = 0;
  665. virtual ~SocketCb() { }
  666. };
  667. /**
  668. * @brief \b Portability: OpenCb callback class
  669. *
  670. */
  671. class RD_EXPORT OpenCb {
  672. public:
  673. /**
  674. * @brief Open callback
  675. * The open callback is responsible for opening the file specified by
  676. * \p pathname, using \p flags and \p mode.
  677. * The file shall be opened with \c CLOEXEC set in a racefree fashion, if
  678. * possible.
  679. *
  680. * It is typically not required to register an alternative open implementation
  681. *
  682. * @remark Not currently available on native Win32
  683. */
  684. virtual int open_cb (const std::string &path, int flags, int mode) = 0;
  685. virtual ~OpenCb() { }
  686. };
  687. /**@}*/
  688. /**
  689. * @name Configuration interface
  690. * @{
  691. *
  692. */
  693. /**
  694. * @brief Configuration interface
  695. *
  696. * Holds either global or topic configuration that are passed to
  697. * RdKafka::Consumer::create(), RdKafka::Producer::create(),
  698. * RdKafka::KafkaConsumer::create(), etc.
  699. *
  700. * @sa CONFIGURATION.md for the full list of supported properties.
  701. */
  702. class RD_EXPORT Conf {
  703. public:
  704. /**
  705. * @brief Configuration object type
  706. */
  707. enum ConfType {
  708. CONF_GLOBAL, /**< Global configuration */
  709. CONF_TOPIC /**< Topic specific configuration */
  710. };
  711. /**
  712. * @brief RdKafka::Conf::Set() result code
  713. */
  714. enum ConfResult {
  715. CONF_UNKNOWN = -2, /**< Unknown configuration property */
  716. CONF_INVALID = -1, /**< Invalid configuration value */
  717. CONF_OK = 0 /**< Configuration property was succesfully set */
  718. };
  719. /**
  720. * @brief Create configuration object
  721. */
  722. static Conf *create (ConfType type);
  723. virtual ~Conf () { }
  724. /**
  725. * @brief Set configuration property \p name to value \p value.
  726. *
  727. * Fallthrough:
  728. * Topic-level configuration properties may be set using this interface
  729. * in which case they are applied on the \c default_topic_conf.
  730. * If no \c default_topic_conf has been set one will be created.
  731. * Any sub-sequent set("default_topic_conf", ..) calls will
  732. * replace the current default topic configuration.
  733. * @returns CONF_OK on success, else writes a human readable error
  734. * description to \p errstr on error.
  735. */
  736. virtual Conf::ConfResult set (const std::string &name,
  737. const std::string &value,
  738. std::string &errstr) = 0;
  739. /** @brief Use with \p name = \c \"dr_cb\" */
  740. virtual Conf::ConfResult set (const std::string &name,
  741. DeliveryReportCb *dr_cb,
  742. std::string &errstr) = 0;
  743. /** @brief Use with \p name = \c \"event_cb\" */
  744. virtual Conf::ConfResult set (const std::string &name,
  745. EventCb *event_cb,
  746. std::string &errstr) = 0;
  747. /** @brief Use with \p name = \c \"default_topic_conf\"
  748. *
  749. * Sets the default topic configuration to use for for automatically
  750. * subscribed topics.
  751. *
  752. * @sa RdKafka::KafkaConsumer::subscribe()
  753. */
  754. virtual Conf::ConfResult set (const std::string &name,
  755. const Conf *topic_conf,
  756. std::string &errstr) = 0;
  757. /** @brief Use with \p name = \c \"partitioner_cb\" */
  758. virtual Conf::ConfResult set (const std::string &name,
  759. PartitionerCb *partitioner_cb,
  760. std::string &errstr) = 0;
  761. /** @brief Use with \p name = \c \"partitioner_key_pointer_cb\" */
  762. virtual Conf::ConfResult set (const std::string &name,
  763. PartitionerKeyPointerCb *partitioner_kp_cb,
  764. std::string &errstr) = 0;
  765. /** @brief Use with \p name = \c \"socket_cb\" */
  766. virtual Conf::ConfResult set (const std::string &name, SocketCb *socket_cb,
  767. std::string &errstr) = 0;
  768. /** @brief Use with \p name = \c \"open_cb\" */
  769. virtual Conf::ConfResult set (const std::string &name, OpenCb *open_cb,
  770. std::string &errstr) = 0;
  771. /** @brief Use with \p name = \c \"rebalance_cb\" */
  772. virtual Conf::ConfResult set (const std::string &name,
  773. RebalanceCb *rebalance_cb,
  774. std::string &errstr) = 0;
  775. /** @brief Use with \p name = \c \"offset_commit_cb\" */
  776. virtual Conf::ConfResult set (const std::string &name,
  777. OffsetCommitCb *offset_commit_cb,
  778. std::string &errstr) = 0;
  779. /** @brief Query single configuration value
  780. *
  781. * Do not use this method to get callbacks registered by the configuration file.
  782. * Instead use the specific get() methods with the specific callback parameter in the signature.
  783. *
  784. * Fallthrough:
  785. * Topic-level configuration properties from the \c default_topic_conf
  786. * may be retrieved using this interface.
  787. *
  788. * @returns CONF_OK if the property was set previously set and
  789. * returns the value in \p value. */
  790. virtual Conf::ConfResult get(const std::string &name,
  791. std::string &value) const = 0;
  792. /** @brief Query single configuration value
  793. * @returns CONF_OK if the property was set previously set and
  794. * returns the value in \p dr_cb. */
  795. virtual Conf::ConfResult get(DeliveryReportCb *&dr_cb) const = 0;
  796. /** @brief Query single configuration value
  797. * @returns CONF_OK if the property was set previously set and
  798. * returns the value in \p event_cb. */
  799. virtual Conf::ConfResult get(EventCb *&event_cb) const = 0;
  800. /** @brief Query single configuration value
  801. * @returns CONF_OK if the property was set previously set and
  802. * returns the value in \p partitioner_cb. */
  803. virtual Conf::ConfResult get(PartitionerCb *&partitioner_cb) const = 0;
  804. /** @brief Query single configuration value
  805. * @returns CONF_OK if the property was set previously set and
  806. * returns the value in \p partitioner_kp_cb. */
  807. virtual Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const = 0;
  808. /** @brief Query single configuration value
  809. * @returns CONF_OK if the property was set previously set and
  810. * returns the value in \p socket_cb. */
  811. virtual Conf::ConfResult get(SocketCb *&socket_cb) const = 0;
  812. /** @brief Query single configuration value
  813. * @returns CONF_OK if the property was set previously set and
  814. * returns the value in \p open_cb. */
  815. virtual Conf::ConfResult get(OpenCb *&open_cb) const = 0;
  816. /** @brief Query single configuration value
  817. * @returns CONF_OK if the property was set previously set and
  818. * returns the value in \p rebalance_cb. */
  819. virtual Conf::ConfResult get(RebalanceCb *&rebalance_cb) const = 0;
  820. /** @brief Query single configuration value
  821. * @returns CONF_OK if the property was set previously set and
  822. * returns the value in \p offset_commit_cb. */
  823. virtual Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const = 0;
  824. /** @brief Dump configuration names and values to list containing
  825. * name,value tuples */
  826. virtual std::list<std::string> *dump () = 0;
  827. /** @brief Use with \p name = \c \"consume_cb\" */
  828. virtual Conf::ConfResult set (const std::string &name, ConsumeCb *consume_cb,
  829. std::string &errstr) = 0;
  830. };
  831. /**@}*/
  832. /**
  833. * @name Kafka base client handle
  834. * @{
  835. *
  836. */
  837. /**
  838. * @brief Base handle, super class for specific clients.
  839. */
  840. class RD_EXPORT Handle {
  841. public:
  842. virtual ~Handle() { }
  843. /** @returns the name of the handle */
  844. virtual const std::string name () const = 0;
  845. /**
  846. * @brief Returns the client's broker-assigned group member id
  847. *
  848. * @remark This currently requires the high-level KafkaConsumer
  849. *
  850. * @returns Last assigned member id, or empty string if not currently
  851. * a group member.
  852. */
  853. virtual const std::string memberid () const = 0;
  854. /**
  855. * @brief Polls the provided kafka handle for events.
  856. *
  857. * Events will trigger application provided callbacks to be called.
  858. *
  859. * The \p timeout_ms argument specifies the maximum amount of time
  860. * (in milliseconds) that the call will block waiting for events.
  861. * For non-blocking calls, provide 0 as \p timeout_ms.
  862. * To wait indefinately for events, provide -1.
  863. *
  864. * Events:
  865. * - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer]
  866. * - event callbacks (if an RdKafka::EventCb is configured) [producer & consumer]
  867. *
  868. * @remark An application should make sure to call poll() at regular
  869. * intervals to serve any queued callbacks waiting to be called.
  870. *
  871. * @warning This method MUST NOT be used with the RdKafka::KafkaConsumer,
  872. * use its RdKafka::KafkaConsumer::consume() instead.
  873. *
  874. * @returns the number of events served.
  875. */
  876. virtual int poll (int timeout_ms) = 0;
  877. /**
  878. * @brief Returns the current out queue length
  879. *
  880. * The out queue contains messages and requests waiting to be sent to,
  881. * or acknowledged by, the broker.
  882. */
  883. virtual int outq_len () = 0;
  884. /**
  885. * @brief Request Metadata from broker.
  886. *
  887. * Parameters:
  888. * \p all_topics - if non-zero: request info about all topics in cluster,
  889. * if zero: only request info about locally known topics.
  890. * \p only_rkt - only request info about this topic
  891. * \p metadatap - pointer to hold metadata result.
  892. * The \p *metadatap pointer must be released with \c delete.
  893. * \p timeout_ms - maximum response time before failing.
  894. *
  895. * @returns RdKafka::ERR_NO_ERROR on success (in which case \p *metadatap
  896. * will be set), else RdKafka::ERR__TIMED_OUT on timeout or
  897. * other error code on error.
  898. */
  899. virtual ErrorCode metadata (bool all_topics, const Topic *only_rkt,
  900. Metadata **metadatap, int timeout_ms) = 0;
  901. /**
  902. * @brief Pause producing or consumption for the provided list of partitions.
  903. *
  904. * Success or error is returned per-partition in the \p partitions list.
  905. *
  906. * @returns ErrorCode::NO_ERROR
  907. *
  908. * @sa resume()
  909. */
  910. virtual ErrorCode pause (std::vector<TopicPartition*> &partitions) = 0;
  911. /**
  912. * @brief Resume producing or consumption for the provided list of partitions.
  913. *
  914. * Success or error is returned per-partition in the \p partitions list.
  915. *
  916. * @returns ErrorCode::NO_ERROR
  917. *
  918. * @sa pause()
  919. */
  920. virtual ErrorCode resume (std::vector<TopicPartition*> &partitions) = 0;
  921. /**
  922. * @brief Query broker for low (oldest/beginning)
  923. * and high (newest/end) offsets for partition.
  924. *
  925. * Offsets are returned in \p *low and \p *high respectively.
  926. *
  927. * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure.
  928. */
  929. virtual ErrorCode query_watermark_offsets (const std::string &topic,
  930. int32_t partition,
  931. int64_t *low, int64_t *high,
  932. int timeout_ms) = 0;
  933. /**
  934. * @brief Get last known low (oldest/beginning)
  935. * and high (newest/end) offsets for partition.
  936. *
  937. * The low offset is updated periodically (if statistics.interval.ms is set)
  938. * while the high offset is updated on each fetched message set from the
  939. * broker.
  940. *
  941. * If there is no cached offset (either low or high, or both) then
  942. * OFFSET_INVALID will be returned for the respective offset.
  943. *
  944. * Offsets are returned in \p *low and \p *high respectively.
  945. *
  946. * @returns RdKafka::ERR_NO_ERROR on success or an error code on failure.
  947. *
  948. * @remark Shall only be used with an active consumer instance.
  949. */
  950. virtual ErrorCode get_watermark_offsets (const std::string &topic,
  951. int32_t partition,
  952. int64_t *low, int64_t *high) = 0;
  953. /**
  954. * @brief Look up the offsets for the given partitions by timestamp.
  955. *
  956. * The returned offset for each partition is the earliest offset whose
  957. * timestamp is greater than or equal to the given timestamp in the
  958. * corresponding partition.
  959. *
  960. * The timestamps to query are represented as \c offset in \p offsets
  961. * on input, and \c offset() will return the closest earlier offset
  962. * for the timestamp on output.
  963. *
  964. * The function will block for at most \p timeout_ms milliseconds.
  965. *
  966. * @remark Duplicate Topic+Partitions are not supported.
  967. * @remark Errors are also returned per TopicPartition, see \c err()
  968. *
  969. * @returns an error code for general errors, else RdKafka::ERR_NO_ERROR
  970. * in which case per-partition errors might be set.
  971. */
  972. virtual ErrorCode offsetsForTimes (std::vector<TopicPartition*> &offsets,
  973. int timeout_ms) = 0;
  974. /**
  975. * @brief Retrieve queue for a given partition.
  976. *
  977. * @returns The fetch queue for the given partition if successful. Else,
  978. * NULL is returned.
  979. *
  980. * @remark This function only works on consumers.
  981. */
  982. virtual Queue *get_partition_queue (const TopicPartition *partition) = 0;
  983. /**
  984. * @brief Forward librdkafka logs (and debug) to the specified queue
  985. * for serving with one of the ..poll() calls.
  986. *
  987. * This allows an application to serve log callbacks (\c log_cb)
  988. * in its thread of choice.
  989. *
  990. * @param queue Queue to forward logs to. If the value is NULL the logs
  991. * are forwarded to the main queue.
  992. *
  993. * @remark The configuration property \c log.queue MUST also be set to true.
  994. *
  995. * @remark librdkafka maintains its own reference to the provided queue.
  996. *
  997. * @returns ERR_NO_ERROR on success or an error code on error.
  998. */
  999. virtual ErrorCode set_log_queue (Queue *queue) = 0;
  1000. /**
  1001. * @brief Cancels the current callback dispatcher (Producer::poll(),
  1002. * Consumer::poll(), KafkaConsumer::consume(), etc).
  1003. *
  1004. * A callback may use this to force an immediate return to the calling
  1005. * code (caller of e.g. ..::poll()) without processing any further
  1006. * events.
  1007. *
  1008. * @remark This function MUST ONLY be called from within a
  1009. * librdkafka callback.
  1010. */
  1011. virtual void yield () = 0;
  1012. /**
  1013. * @brief Returns the ClusterId as reported in broker metadata.
  1014. *
  1015. * @param timeout_ms If there is no cached value from metadata retrieval
  1016. * then this specifies the maximum amount of time
  1017. * (in milliseconds) the call will block waiting
  1018. * for metadata to be retrieved.
  1019. * Use 0 for non-blocking calls.
  1020. *
  1021. * @remark Requires broker version >=0.10.0 and api.version.request=true.
  1022. *
  1023. * @returns Last cached ClusterId, or empty string if no ClusterId could be
  1024. * retrieved in the allotted timespan.
  1025. */
  1026. virtual const std::string clusterid (int timeout_ms) = 0;
  1027. /**
  1028. * @brief Returns the underlying librdkafka C rd_kafka_t handle.
  1029. *
  1030. * @warning Calling the C API on this handle is not recommended and there
  1031. * is no official support for it, but for cases where the C++
  1032. * does not provide the proper functionality this C handle can be
  1033. * used to interact directly with the core librdkafka API.
  1034. *
  1035. * @remark The lifetime of the returned pointer is the same as the Topic
  1036. * object this method is called on.
  1037. *
  1038. * @remark Include <rdkafka/rdkafka.h> prior to including
  1039. * <rdkafka/rdkafkacpp.h>
  1040. *
  1041. * @returns \c rd_kafka_t*
  1042. */
  1043. virtual struct rd_kafka_s *c_ptr () = 0;
  1044. };
  1045. /**@}*/
  1046. /**
  1047. * @name Topic and partition objects
  1048. * @{
  1049. *
  1050. */
  1051. /**
  1052. * @brief Topic+Partition
  1053. *
  1054. * This is a generic type to hold a single partition and various
  1055. * information about it.
  1056. *
  1057. * Is typically used with std::vector<RdKafka::TopicPartition*> to provide
  1058. * a list of partitions for different operations.
  1059. */
  1060. class RD_EXPORT TopicPartition {
  1061. public:
  1062. /**
  1063. * Create topic+partition object for \p topic and \p partition
  1064. * and optionally \p offset.
  1065. *
  1066. * Use \c delete to deconstruct.
  1067. */
  1068. static TopicPartition *create (const std::string &topic, int partition);
  1069. static TopicPartition *create (const std::string &topic, int partition,
  1070. int64_t offset);
  1071. virtual ~TopicPartition() = 0;
  1072. /**
  1073. * @brief Destroy/delete the TopicPartitions in \p partitions
  1074. * and clear the vector.
  1075. */
  1076. static void destroy (std::vector<TopicPartition*> &partitions);
  1077. /** @returns topic name */
  1078. virtual const std::string &topic () const = 0;
  1079. /** @returns partition id */
  1080. virtual int partition () const = 0;
  1081. /** @returns offset (if applicable) */
  1082. virtual int64_t offset () const = 0;
  1083. /** @brief Set offset */
  1084. virtual void set_offset (int64_t offset) = 0;
  1085. /** @returns error code (if applicable) */
  1086. virtual ErrorCode err () const = 0;
  1087. };
  1088. /**
  1089. * @brief Topic handle
  1090. *
  1091. */
  1092. class RD_EXPORT Topic {
  1093. public:
  1094. /**
  1095. * @brief Unassigned partition.
  1096. *
  1097. * The unassigned partition is used by the producer API for messages
  1098. * that should be partitioned using the configured or default partitioner.
  1099. */
  1100. static const int32_t PARTITION_UA;
  1101. /** @brief Special offsets */
  1102. static const int64_t OFFSET_BEGINNING; /**< Consume from beginning */
  1103. static const int64_t OFFSET_END; /**< Consume from end */
  1104. static const int64_t OFFSET_STORED; /**< Use offset storage */
  1105. static const int64_t OFFSET_INVALID; /**< Invalid offset */
  1106. /**
  1107. * @brief Creates a new topic handle for topic named \p topic_str
  1108. *
  1109. * \p conf is an optional configuration for the topic that will be used
  1110. * instead of the default topic configuration.
  1111. * The \p conf object is reusable after this call.
  1112. *
  1113. * @returns the new topic handle or NULL on error (see \p errstr).
  1114. */
  1115. static Topic *create (Handle *base, const std::string &topic_str,
  1116. Conf *conf, std::string &errstr);
  1117. virtual ~Topic () = 0;
  1118. /** @returns the topic name */
  1119. virtual const std::string name () const = 0;
  1120. /**
  1121. * @returns true if \p partition is available for the topic (has leader).
  1122. * @warning \b MUST \b ONLY be called from within a
  1123. * RdKafka::PartitionerCb callback.
  1124. */
  1125. virtual bool partition_available (int32_t partition) const = 0;
  1126. /**
  1127. * @brief Store offset \p offset for topic partition \p partition.
  1128. * The offset will be committed (written) to the offset store according
  1129. * to \p auto.commit.interval.ms.
  1130. *
  1131. * @remark \c enable.auto.offset.store must be set to \c false when using this API.
  1132. *
  1133. * @returns RdKafka::ERR_NO_ERROR on success or an error code if none of the
  1134. * offsets could be stored.
  1135. */
  1136. virtual ErrorCode offset_store (int32_t partition, int64_t offset) = 0;
  1137. /**
  1138. * @brief Returns the underlying librdkafka C rd_kafka_topic_t handle.
  1139. *
  1140. * @warning Calling the C API on this handle is not recommended and there
  1141. * is no official support for it, but for cases where the C++ API
  1142. * does not provide the underlying functionality this C handle can be
  1143. * used to interact directly with the core librdkafka API.
  1144. *
  1145. * @remark The lifetime of the returned pointer is the same as the Topic
  1146. * object this method is called on.
  1147. *
  1148. * @remark Include <rdkafka/rdkafka.h> prior to including
  1149. * <rdkafka/rdkafkacpp.h>
  1150. *
  1151. * @returns \c rd_kafka_topic_t*
  1152. */
  1153. virtual struct rd_kafka_topic_s *c_ptr () = 0;
  1154. };
  1155. /**@}*/
  1156. /**
  1157. * @name Message object
  1158. * @{
  1159. *
  1160. */
  1161. /**
  1162. * @brief Message timestamp object
  1163. *
  1164. * Represents the number of milliseconds since the epoch (UTC).
  1165. *
  1166. * The MessageTimestampType dictates the timestamp type or origin.
  1167. *
  1168. * @remark Requires Apache Kafka broker version >= 0.10.0
  1169. *
  1170. */
  1171. class RD_EXPORT MessageTimestamp {
  1172. public:
  1173. enum MessageTimestampType {
  1174. MSG_TIMESTAMP_NOT_AVAILABLE, /**< Timestamp not available */
  1175. MSG_TIMESTAMP_CREATE_TIME, /**< Message creation time (source) */
  1176. MSG_TIMESTAMP_LOG_APPEND_TIME /**< Message log append time (broker) */
  1177. };
  1178. MessageTimestampType type; /**< Timestamp type */
  1179. int64_t timestamp; /**< Milliseconds since epoch (UTC). */
  1180. };
  1181. /**
  1182. * @brief Message object
  1183. *
  1184. * This object represents either a single consumed or produced message,
  1185. * or an event (\p err() is set).
  1186. *
  1187. * An application must check RdKafka::Message::err() to see if the
  1188. * object is a proper message (error is RdKafka::ERR_NO_ERROR) or a
  1189. * an error event.
  1190. *
  1191. */
  1192. class RD_EXPORT Message {
  1193. public:
  1194. /**
  1195. * @brief Accessor functions*
  1196. * @remark Not all fields are present in all types of callbacks.
  1197. */
  1198. /** @returns The error string if object represent an error event,
  1199. * else an empty string. */
  1200. virtual std::string errstr() const = 0;
  1201. /** @returns The error code if object represents an error event, else 0. */
  1202. virtual ErrorCode err () const = 0;
  1203. /** @returns the RdKafka::Topic object for a message (if applicable),
  1204. * or NULL if a corresponding RdKafka::Topic object has not been
  1205. * explicitly created with RdKafka::Topic::create().
  1206. * In this case use topic_name() instead. */
  1207. virtual Topic *topic () const = 0;
  1208. /** @returns Topic name (if applicable, else empty string) */
  1209. virtual std::string topic_name () const = 0;
  1210. /** @returns Partition (if applicable) */
  1211. virtual int32_t partition () const = 0;
  1212. /** @returns Message payload (if applicable) */
  1213. virtual void *payload () const = 0 ;
  1214. /** @returns Message payload length (if applicable) */
  1215. virtual size_t len () const = 0;
  1216. /** @returns Message key as string (if applicable) */
  1217. virtual const std::string *key () const = 0;
  1218. /** @returns Message key as void pointer (if applicable) */
  1219. virtual const void *key_pointer () const = 0 ;
  1220. /** @returns Message key's binary length (if applicable) */
  1221. virtual size_t key_len () const = 0;
  1222. /** @returns Message or error offset (if applicable) */
  1223. virtual int64_t offset () const = 0;
  1224. /** @returns Message timestamp (if applicable) */
  1225. virtual MessageTimestamp timestamp () const = 0;
  1226. /** @returns The \p msg_opaque as provided to RdKafka::Producer::produce() */
  1227. virtual void *msg_opaque () const = 0;
  1228. virtual ~Message () = 0;
  1229. /** @returns the latency in microseconds for a produced message measured
  1230. * from the produce() call, or -1 if latency is not available. */
  1231. virtual int64_t latency () const = 0;
  1232. /**
  1233. * @brief Returns the underlying librdkafka C rd_kafka_message_t handle.
  1234. *
  1235. * @warning Calling the C API on this handle is not recommended and there
  1236. * is no official support for it, but for cases where the C++ API
  1237. * does not provide the underlying functionality this C handle can be
  1238. * used to interact directly with the core librdkafka API.
  1239. *
  1240. * @remark The lifetime of the returned pointer is the same as the Message
  1241. * object this method is called on.
  1242. *
  1243. * @remark Include <rdkafka/rdkafka.h> prior to including
  1244. * <rdkafka/rdkafkacpp.h>
  1245. *
  1246. * @returns \c rd_kafka_message_t*
  1247. */
  1248. virtual struct rd_kafka_message_s *c_ptr () = 0;
  1249. };
  1250. /**@}*/
  1251. /**
  1252. * @name Queue interface
  1253. * @{
  1254. *
  1255. */
  1256. /**
  1257. * @brief Queue interface
  1258. *
  1259. * Create a new message queue. Message queues allows the application
  1260. * to re-route consumed messages from multiple topic+partitions into
  1261. * one single queue point. This queue point, containing messages from
  1262. * a number of topic+partitions, may then be served by a single
  1263. * consume() method, rather than one per topic+partition combination.
  1264. *
  1265. * See the RdKafka::Consumer::start(), RdKafka::Consumer::consume(), and
  1266. * RdKafka::Consumer::consume_callback() methods that take a queue as the first
  1267. * parameter for more information.
  1268. */
  1269. class RD_EXPORT Queue {
  1270. public:
  1271. /**
  1272. * @brief Create Queue object
  1273. */
  1274. static Queue *create (Handle *handle);
  1275. /**
  1276. * @brief Forward/re-route queue to \p dst.
  1277. * If \p dst is \c NULL, the forwarding is removed.
  1278. *
  1279. * The internal refcounts for both queues are increased.
  1280. *
  1281. * @remark Regardless of whether \p dst is NULL or not, after calling this
  1282. * function, \p src will not forward it's fetch queue to the consumer
  1283. * queue.
  1284. */
  1285. virtual ErrorCode forward (Queue *dst) = 0;
  1286. /**
  1287. * @brief Consume message or get error event from the queue.
  1288. *
  1289. * @remark Use \c delete to free the message.
  1290. *
  1291. * @returns One of:
  1292. * - proper message (RdKafka::Message::err() is ERR_NO_ERROR)
  1293. * - error event (RdKafka::Message::err() is != ERR_NO_ERROR)
  1294. * - timeout due to no message or event in \p timeout_ms
  1295. * (RdKafka::Message::err() is ERR__TIMED_OUT)
  1296. */
  1297. virtual Message *consume (int timeout_ms) = 0;
  1298. /**
  1299. * @brief Poll queue, serving any enqueued callbacks.
  1300. *
  1301. * @remark Must NOT be used for queues containing messages.
  1302. *
  1303. * @returns the number of events served or 0 on timeout.
  1304. */
  1305. virtual int poll (int timeout_ms) = 0;
  1306. virtual ~Queue () = 0;
  1307. /**
  1308. * @brief Enable IO event triggering for queue.
  1309. *
  1310. * To ease integration with IO based polling loops this API
  1311. * allows an application to create a separate file-descriptor
  1312. * that librdkafka will write \p payload (of size \p size) to
  1313. * whenever a new element is enqueued on a previously empty queue.
  1314. *
  1315. * To remove event triggering call with \p fd = -1.
  1316. *
  1317. * librdkafka will maintain a copy of the \p payload.
  1318. *
  1319. * @remark When using forwarded queues the IO event must only be enabled
  1320. * on the final forwarded-to (destination) queue.
  1321. */
  1322. virtual void io_event_enable (int fd, const void *payload, size_t size) = 0;
  1323. };
  1324. /**@}*/
  1325. /**
  1326. * @name KafkaConsumer
  1327. * @{
  1328. *
  1329. */
  1330. /**
  1331. * @brief High-level KafkaConsumer (for brokers 0.9 and later)
  1332. *
  1333. * @remark Requires Apache Kafka >= 0.9.0 brokers
  1334. *
  1335. * Currently supports the \c range and \c roundrobin partition assignment
  1336. * strategies (see \c partition.assignment.strategy)
  1337. */
  1338. class RD_EXPORT KafkaConsumer : public virtual Handle {
  1339. public:
  1340. /**
  1341. * @brief Creates a KafkaConsumer.
  1342. *
  1343. * The \p conf object must have \c group.id set to the consumer group to join.
  1344. *
  1345. * Use RdKafka::KafkaConsumer::close() to shut down the consumer.
  1346. *
  1347. * @sa RdKafka::RebalanceCb
  1348. * @sa CONFIGURATION.md for \c group.id, \c session.timeout.ms,
  1349. * \c partition.assignment.strategy, etc.
  1350. */
  1351. static KafkaConsumer *create (Conf *conf, std::string &errstr);
  1352. virtual ~KafkaConsumer () = 0;
  1353. /** @brief Returns the current partition assignment as set by
  1354. * RdKafka::KafkaConsumer::assign() */
  1355. virtual ErrorCode assignment (std::vector<RdKafka::TopicPartition*> &partitions) = 0;
  1356. /** @brief Returns the current subscription as set by
  1357. * RdKafka::KafkaConsumer::subscribe() */
  1358. virtual ErrorCode subscription (std::vector<std::string> &topics) = 0;
  1359. /**
  1360. * @brief Update the subscription set to \p topics.
  1361. *
  1362. * Any previous subscription will be unassigned and unsubscribed first.
  1363. *
  1364. * The subscription set denotes the desired topics to consume and this
  1365. * set is provided to the partition assignor (one of the elected group
  1366. * members) for all clients which then uses the configured
  1367. * \c partition.assignment.strategy to assign the subscription sets's
  1368. * topics's partitions to the consumers, depending on their subscription.
  1369. *
  1370. * The result of such an assignment is a rebalancing which is either
  1371. * handled automatically in librdkafka or can be overriden by the application
  1372. * by providing a RdKafka::RebalanceCb.
  1373. *
  1374. * The rebalancing passes the assigned partition set to
  1375. * RdKafka::KafkaConsumer::assign() to update what partitions are actually
  1376. * being fetched by the KafkaConsumer.
  1377. *
  1378. * Regex pattern matching automatically performed for topics prefixed
  1379. * with \c \"^\" (e.g. \c \"^myPfx[0-9]_.*\"
  1380. *
  1381. * @returns an error if the provided list of topics is invalid.
  1382. */
  1383. virtual ErrorCode subscribe (const std::vector<std::string> &topics) = 0;
  1384. /** @brief Unsubscribe from the current subscription set. */
  1385. virtual ErrorCode unsubscribe () = 0;
  1386. /**
  1387. * @brief Update the assignment set to \p partitions.
  1388. *
  1389. * The assignment set is the set of partitions actually being consumed
  1390. * by the KafkaConsumer.
  1391. */
  1392. virtual ErrorCode assign (const std::vector<TopicPartition*> &partitions) = 0;
  1393. /**
  1394. * @brief Stop consumption and remove the current assignment.
  1395. */
  1396. virtual ErrorCode unassign () = 0;
  1397. /**
  1398. * @brief Consume message or get error event, triggers callbacks.
  1399. *
  1400. * Will automatically call registered callbacks for any such queued events,
  1401. * including RdKafka::RebalanceCb, RdKafka::EventCb, RdKafka::OffsetCommitCb,
  1402. * etc.
  1403. *
  1404. * @remark Use \c delete to free the message.
  1405. *
  1406. * @remark An application should make sure to call consume() at regular
  1407. * intervals, even if no messages are expected, to serve any
  1408. * queued callbacks waiting to be called. This is especially
  1409. * important when a RebalanceCb has been registered as it needs
  1410. * to be called and handled properly to synchronize internal
  1411. * consumer state.
  1412. *
  1413. * @remark Application MUST NOT call \p poll() on KafkaConsumer objects.
  1414. *
  1415. * @returns One of:
  1416. * - proper message (RdKafka::Message::err() is ERR_NO_ERROR)
  1417. * - error event (RdKafka::Message::err() is != ERR_NO_ERROR)
  1418. * - timeout due to no message or event in \p timeout_ms
  1419. * (RdKafka::Message::err() is ERR__TIMED_OUT)
  1420. */
  1421. virtual Message *consume (int timeout_ms) = 0;
  1422. /**
  1423. * @brief Commit offsets for the current assignment.
  1424. *
  1425. * @remark This is the synchronous variant that blocks until offsets
  1426. * are committed or the commit fails (see return value).
  1427. *
  1428. * @remark If a RdKafka::OffsetCommitCb callback is registered it will
  1429. * be called with commit details on a future call to
  1430. * RdKafka::KafkaConsumer::consume()
  1431. *
  1432. * @returns ERR_NO_ERROR or error code.
  1433. */
  1434. virtual ErrorCode commitSync () = 0;
  1435. /**
  1436. * @brief Asynchronous version of RdKafka::KafkaConsumer::CommitSync()
  1437. *
  1438. * @sa RdKafka::KafkaConsummer::commitSync()
  1439. */
  1440. virtual ErrorCode commitAsync () = 0;
  1441. /**
  1442. * @brief Commit offset for a single topic+partition based on \p message
  1443. *
  1444. * @remark This is the synchronous variant.
  1445. *
  1446. * @sa RdKafka::KafkaConsummer::commitSync()
  1447. */
  1448. virtual ErrorCode commitSync (Message *message) = 0;
  1449. /**
  1450. * @brief Commit offset for a single topic+partition based on \p message
  1451. *
  1452. * @remark This is the asynchronous variant.
  1453. *
  1454. * @sa RdKafka::KafkaConsummer::commitSync()
  1455. */
  1456. virtual ErrorCode commitAsync (Message *message) = 0;
  1457. /**
  1458. * @brief Commit offsets for the provided list of partitions.
  1459. *
  1460. * @remark This is the synchronous variant.
  1461. */
  1462. virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
  1463. /**
  1464. * @brief Commit offset for the provided list of partitions.
  1465. *
  1466. * @remark This is the asynchronous variant.
  1467. */
  1468. virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
  1469. /**
  1470. * @brief Commit offsets for the current assignment.
  1471. *
  1472. * @remark This is the synchronous variant that blocks until offsets
  1473. * are committed or the commit fails (see return value).
  1474. *
  1475. * @remark The provided callback will be called from this function.
  1476. *
  1477. * @returns ERR_NO_ERROR or error code.
  1478. */
  1479. virtual ErrorCode commitSync (OffsetCommitCb *offset_commit_cb) = 0;
  1480. /**
  1481. * @brief Commit offsets for the provided list of partitions.
  1482. *
  1483. * @remark This is the synchronous variant that blocks until offsets
  1484. * are committed or the commit fails (see return value).
  1485. *
  1486. * @remark The provided callback will be called from this function.
  1487. *
  1488. * @returns ERR_NO_ERROR or error code.
  1489. */
  1490. virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets,
  1491. OffsetCommitCb *offset_commit_cb) = 0;
  1492. /**
  1493. * @brief Retrieve committed offsets for topics+partitions.
  1494. *
  1495. * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the
  1496. * \p offset or \p err field of each \p partitions' element is filled
  1497. * in with the stored offset, or a partition specific error.
  1498. * Else returns an error code.
  1499. */
  1500. virtual ErrorCode committed (std::vector<TopicPartition*> &partitions,
  1501. int timeout_ms) = 0;
  1502. /**
  1503. * @brief Retrieve current positions (offsets) for topics+partitions.
  1504. *
  1505. * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success in which case the
  1506. * \p offset or \p err field of each \p partitions' element is filled
  1507. * in with the stored offset, or a partition specific error.
  1508. * Else returns an error code.
  1509. */
  1510. virtual ErrorCode position (std::vector<TopicPartition*> &partitions) = 0;
  1511. /**
  1512. * For pausing and resuming consumption, see
  1513. * @sa RdKafka::Handle::pause() and RdKafka::Handle::resume()
  1514. */
  1515. /**
  1516. * @brief Close and shut down the proper.
  1517. *
  1518. * This call will block until the following operations are finished:
  1519. * - Trigger a local rebalance to void the current assignment
  1520. * - Stop consumption for current assignment
  1521. * - Commit offsets
  1522. * - Leave group
  1523. *
  1524. * The maximum blocking time is roughly limited to session.timeout.ms.
  1525. *
  1526. * @remark Callbacks, such as RdKafka::RebalanceCb and
  1527. * RdKafka::OffsetCommitCb, etc, may be called.
  1528. *
  1529. * @remark The consumer object must later be freed with \c delete
  1530. */
  1531. virtual ErrorCode close () = 0;
  1532. /**
  1533. * @brief Seek consumer for topic+partition to offset which is either an
  1534. * absolute or logical offset.
  1535. *
  1536. * If \p timeout_ms is not 0 the call will wait this long for the
  1537. * seek to be performed. If the timeout is reached the internal state
  1538. * will be unknown and this function returns `ERR__TIMED_OUT`.
  1539. * If \p timeout_ms is 0 it will initiate the seek but return
  1540. * immediately without any error reporting (e.g., async).
  1541. *
  1542. * This call triggers a fetch queue barrier flush.
  1543. *
  1544. * @remark Consumtion for the given partition must have started for the
  1545. * seek to work. Use assign() to set the starting offset.
  1546. *
  1547. * @returns an ErrorCode to indicate success or failure.
  1548. */
  1549. virtual ErrorCode seek (const TopicPartition &partition, int timeout_ms) = 0;
  1550. /**
  1551. * @brief Store offset \p offset for topic partition \p partition.
  1552. * The offset will be committed (written) to the offset store according
  1553. * to \p auto.commit.interval.ms or the next manual offset-less commit*()
  1554. *
  1555. * Per-partition success/error status propagated through TopicPartition.err()
  1556. *
  1557. * @remark \c enable.auto.offset.store must be set to \c false when using this API.
  1558. *
  1559. * @returns RdKafka::ERR_NO_ERROR on success, or
  1560. * RdKafka::ERR___UNKNOWN_PARTITION if none of the offsets could
  1561. * be stored, or
  1562. * RdKafka::ERR___INVALID_ARG if \c enable.auto.offset.store is true.
  1563. */
  1564. virtual ErrorCode offsets_store (std::vector<TopicPartition*> &offsets) = 0;
  1565. };
  1566. /**@}*/
  1567. /**
  1568. * @name Simple Consumer (legacy)
  1569. * @{
  1570. *
  1571. */
  1572. /**
  1573. * @brief Simple Consumer (legacy)
  1574. *
  1575. * A simple non-balanced, non-group-aware, consumer.
  1576. */
  1577. class RD_EXPORT Consumer : public virtual Handle {
  1578. public:
  1579. /**
  1580. * @brief Creates a new Kafka consumer handle.
  1581. *
  1582. * \p conf is an optional object that will be used instead of the default
  1583. * configuration.
  1584. * The \p conf object is reusable after this call.
  1585. *
  1586. * @returns the new handle on success or NULL on error in which case
  1587. * \p errstr is set to a human readable error message.
  1588. */
  1589. static Consumer *create (Conf *conf, std::string &errstr);
  1590. virtual ~Consumer () = 0;
  1591. /**
  1592. * @brief Start consuming messages for topic and \p partition
  1593. * at offset \p offset which may either be a proper offset (0..N)
  1594. * or one of the the special offsets: \p OFFSET_BEGINNING or \p OFFSET_END.
  1595. *
  1596. * rdkafka will attempt to keep \p queued.min.messages (config property)
  1597. * messages in the local queue by repeatedly fetching batches of messages
  1598. * from the broker until the threshold is reached.
  1599. *
  1600. * The application shall use one of the \p ..->consume*() functions
  1601. * to consume messages from the local queue, each kafka message being
  1602. * represented as a `RdKafka::Message *` object.
  1603. *
  1604. * \p ..->start() must not be called multiple times for the same
  1605. * topic and partition without stopping consumption first with
  1606. * \p ..->stop().
  1607. *
  1608. * @returns an ErrorCode to indicate success or failure.
  1609. */
  1610. virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset) = 0;
  1611. /**
  1612. * @brief Start consuming messages for topic and \p partition on
  1613. * queue \p queue.
  1614. *
  1615. * @sa RdKafka::Consumer::start()
  1616. */
  1617. virtual ErrorCode start (Topic *topic, int32_t partition, int64_t offset,
  1618. Queue *queue) = 0;
  1619. /**
  1620. * @brief Stop consuming messages for topic and \p partition, purging
  1621. * all messages currently in the local queue.
  1622. *
  1623. * The application needs to be stop all consumers before destroying
  1624. * the Consumer handle.
  1625. *
  1626. * @returns an ErrorCode to indicate success or failure.
  1627. */
  1628. virtual ErrorCode stop (Topic *topic, int32_t partition) = 0;
  1629. /**
  1630. * @brief Seek consumer for topic+partition to \p offset which is either an
  1631. * absolute or logical offset.
  1632. *
  1633. * If \p timeout_ms is not 0 the call will wait this long for the
  1634. * seek to be performed. If the timeout is reached the internal state
  1635. * will be unknown and this function returns `ERR__TIMED_OUT`.
  1636. * If \p timeout_ms is 0 it will initiate the seek but return
  1637. * immediately without any error reporting (e.g., async).
  1638. *
  1639. * This call triggers a fetch queue barrier flush.
  1640. *
  1641. * @returns an ErrorCode to indicate success or failure.
  1642. */
  1643. virtual ErrorCode seek (Topic *topic, int32_t partition, int64_t offset,
  1644. int timeout_ms) = 0;
  1645. /**
  1646. * @brief Consume a single message from \p topic and \p partition.
  1647. *
  1648. * \p timeout_ms is maximum amount of time to wait for a message to be
  1649. * received.
  1650. * Consumer must have been previously started with \p ..->start().
  1651. *
  1652. * @returns a Message object, the application needs to check if message
  1653. * is an error or a proper message RdKafka::Message::err() and checking for
  1654. * \p ERR_NO_ERROR.
  1655. *
  1656. * The message object must be destroyed when the application is done with it.
  1657. *
  1658. * Errors (in RdKafka::Message::err()):
  1659. * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched.
  1660. * - ERR__PARTITION_EOF - End of partition reached, not an error.
  1661. */
  1662. virtual Message *consume (Topic *topic, int32_t partition,
  1663. int timeout_ms) = 0;
  1664. /**
  1665. * @brief Consume a single message from the specified queue.
  1666. *
  1667. * \p timeout_ms is maximum amount of time to wait for a message to be
  1668. * received.
  1669. * Consumer must have been previously started on the queue with
  1670. * \p ..->start().
  1671. *
  1672. * @returns a Message object, the application needs to check if message
  1673. * is an error or a proper message \p Message->err() and checking for
  1674. * \p ERR_NO_ERROR.
  1675. *
  1676. * The message object must be destroyed when the application is done with it.
  1677. *
  1678. * Errors (in RdKafka::Message::err()):
  1679. * - ERR__TIMED_OUT - \p timeout_ms was reached with no new messages fetched
  1680. *
  1681. * Note that Message->topic() may be nullptr after certain kinds of
  1682. * errors, so applications should check that it isn't null before
  1683. * dereferencing it.
  1684. */
  1685. virtual Message *consume (Queue *queue, int timeout_ms) = 0;
  1686. /**
  1687. * @brief Consumes messages from \p topic and \p partition, calling
  1688. * the provided callback for each consumed messsage.
  1689. *
  1690. * \p consume_callback() provides higher throughput performance
  1691. * than \p consume().
  1692. *
  1693. * \p timeout_ms is the maximum amount of time to wait for one or
  1694. * more messages to arrive.
  1695. *
  1696. * The provided \p consume_cb instance has its \p consume_cb function
  1697. * called for every message received.
  1698. *
  1699. * The \p opaque argument is passed to the \p consume_cb as \p opaque.
  1700. *
  1701. * @returns the number of messages processed or -1 on error.
  1702. *
  1703. * @sa RdKafka::Consumer::consume()
  1704. */
  1705. virtual int consume_callback (Topic *topic, int32_t partition,
  1706. int timeout_ms,
  1707. ConsumeCb *consume_cb,
  1708. void *opaque) = 0;
  1709. /**
  1710. * @brief Consumes messages from \p queue, calling the provided callback for
  1711. * each consumed messsage.
  1712. *
  1713. * @sa RdKafka::Consumer::consume_callback()
  1714. */
  1715. virtual int consume_callback (Queue *queue, int timeout_ms,
  1716. RdKafka::ConsumeCb *consume_cb,
  1717. void *opaque) = 0;
  1718. /**
  1719. * @brief Converts an offset into the logical offset from the tail of a topic.
  1720. *
  1721. * \p offset is the (positive) number of items from the end.
  1722. *
  1723. * @returns the logical offset for message \p offset from the tail, this value
  1724. * may be passed to Consumer::start, et.al.
  1725. * @remark The returned logical offset is specific to librdkafka.
  1726. */
  1727. static int64_t OffsetTail(int64_t offset);
  1728. };
  1729. /**@}*/
  1730. /**
  1731. * @name Producer
  1732. * @{
  1733. *
  1734. */
  1735. /**
  1736. * @brief Producer
  1737. */
  1738. class RD_EXPORT Producer : public virtual Handle {
  1739. public:
  1740. /**
  1741. * @brief Creates a new Kafka producer handle.
  1742. *
  1743. * \p conf is an optional object that will be used instead of the default
  1744. * configuration.
  1745. * The \p conf object is reusable after this call.
  1746. *
  1747. * @returns the new handle on success or NULL on error in which case
  1748. * \p errstr is set to a human readable error message.
  1749. */
  1750. static Producer *create (Conf *conf, std::string &errstr);
  1751. virtual ~Producer () = 0;
  1752. /**
  1753. * @brief RdKafka::Producer::produce() \p msgflags
  1754. *
  1755. * These flags are optional and mutually exclusive.
  1756. */
  1757. enum {
  1758. RK_MSG_FREE = 0x1, /**< rdkafka will free(3) \p payload
  1759. * when it is done with it. */
  1760. RK_MSG_COPY = 0x2, /**< the \p payload data will be copied
  1761. * and the \p payload pointer will not
  1762. * be used by rdkafka after the
  1763. * call returns. */
  1764. RK_MSG_BLOCK = 0x4 /**< Block produce*() on message queue
  1765. * full.
  1766. * WARNING:
  1767. * If a delivery report callback
  1768. * is used the application MUST
  1769. * call rd_kafka_poll() (or equiv.)
  1770. * to make sure delivered messages
  1771. * are drained from the internal
  1772. * delivery report queue.
  1773. * Failure to do so will result
  1774. * in indefinately blocking on
  1775. * the produce() call when the
  1776. * message queue is full.
  1777. */
  1778. /**@cond NO_DOC*/
  1779. /* For backwards compatibility: */
  1780. #ifndef MSG_COPY /* defined in sys/msg.h */
  1781. , /** this comma must exist betwen
  1782. * RK_MSG_BLOCK and MSG_FREE
  1783. */
  1784. MSG_FREE = RK_MSG_FREE,
  1785. MSG_COPY = RK_MSG_COPY
  1786. #endif
  1787. /**@endcond*/
  1788. };
  1789. /**
  1790. * @brief Produce and send a single message to broker.
  1791. *
  1792. * This is an asynch non-blocking API.
  1793. *
  1794. * \p partition is the target partition, either:
  1795. * - RdKafka::Topic::PARTITION_UA (unassigned) for
  1796. * automatic partitioning using the topic's partitioner function, or
  1797. * - a fixed partition (0..N)
  1798. *
  1799. * \p msgflags is zero or more of the following flags OR:ed together:
  1800. * RK_MSG_BLOCK - block \p produce*() call if
  1801. * \p queue.buffering.max.messages or
  1802. * \p queue.buffering.max.kbytes are exceeded.
  1803. * Messages are considered in-queue from the point they
  1804. * are accepted by produce() until their corresponding
  1805. * delivery report callback/event returns.
  1806. * It is thus a requirement to call
  1807. * poll() (or equiv.) from a separate
  1808. * thread when RK_MSG_BLOCK is used.
  1809. * See WARNING on \c RK_MSG_BLOCK above.
  1810. * RK_MSG_FREE - rdkafka will free(3) \p payload when it is done with it.
  1811. * RK_MSG_COPY - the \p payload data will be copied and the \p payload
  1812. * pointer will not be used by rdkafka after the
  1813. * call returns.
  1814. *
  1815. * NOTE: RK_MSG_FREE and RK_MSG_COPY are mutually exclusive.
  1816. *
  1817. * If the function returns an error code and RK_MSG_FREE was specified, then
  1818. * the memory associated with the payload is still the caller's
  1819. * responsibility.
  1820. *
  1821. * \p payload is the message payload of size \p len bytes.
  1822. *
  1823. * \p key is an optional message key, if non-NULL it
  1824. * will be passed to the topic partitioner as well as be sent with the
  1825. * message to the broker and passed on to the consumer.
  1826. *
  1827. * \p msg_opaque is an optional application-provided per-message opaque
  1828. * pointer that will provided in the delivery report callback (\p dr_cb) for
  1829. * referencing this message.
  1830. *
  1831. * @returns an ErrorCode to indicate success or failure:
  1832. * - ERR_NO_ERROR - message successfully enqueued for transmission.
  1833. *
  1834. * - ERR__QUEUE_FULL - maximum number of outstanding messages has been
  1835. * reached: \c queue.buffering.max.message
  1836. *
  1837. * - ERR_MSG_SIZE_TOO_LARGE - message is larger than configured max size:
  1838. * \c messages.max.bytes
  1839. *
  1840. * - ERR__UNKNOWN_PARTITION - requested \p partition is unknown in the
  1841. * Kafka cluster.
  1842. *
  1843. * - ERR__UNKNOWN_TOPIC - topic is unknown in the Kafka cluster.
  1844. */
  1845. virtual ErrorCode produce (Topic *topic, int32_t partition,
  1846. int msgflags,
  1847. void *payload, size_t len,
  1848. const std::string *key,
  1849. void *msg_opaque) = 0;
  1850. /**
  1851. * @brief Variant produce() that passes the key as a pointer and length
  1852. * instead of as a const std::string *.
  1853. */
  1854. virtual ErrorCode produce (Topic *topic, int32_t partition,
  1855. int msgflags,
  1856. void *payload, size_t len,
  1857. const void *key, size_t key_len,
  1858. void *msg_opaque) = 0;
  1859. /**
  1860. * @brief produce() variant that takes topic as a string (no need for
  1861. * creating a Topic object), and also allows providing the
  1862. * message timestamp (microseconds since beginning of epoch, UTC).
  1863. * Otherwise identical to produce() above.
  1864. */
  1865. virtual ErrorCode produce (const std::string topic_name, int32_t partition,
  1866. int msgflags,
  1867. void *payload, size_t len,
  1868. const void *key, size_t key_len,
  1869. int64_t timestamp,
  1870. void *msg_opaque) = 0;
  1871. /**
  1872. * @brief Variant produce() that accepts vectors for key and payload.
  1873. * The vector data will be copied.
  1874. */
  1875. virtual ErrorCode produce (Topic *topic, int32_t partition,
  1876. const std::vector<char> *payload,
  1877. const std::vector<char> *key,
  1878. void *msg_opaque) = 0;
  1879. /**
  1880. * @brief Wait until all outstanding produce requests, et.al, are completed.
  1881. * This should typically be done prior to destroying a producer instance
  1882. * to make sure all queued and in-flight produce requests are completed
  1883. * before terminating.
  1884. *
  1885. * @remark This function will call poll() and thus trigger callbacks.
  1886. *
  1887. * @returns ERR__TIMED_OUT if \p timeout_ms was reached before all
  1888. * outstanding requests were completed, else ERR_NO_ERROR
  1889. */
  1890. virtual ErrorCode flush (int timeout_ms) = 0;
  1891. };
  1892. /**@}*/
  1893. /**
  1894. * @name Metadata interface
  1895. * @{
  1896. *
  1897. */
  1898. /**
  1899. * @brief Metadata: Broker information
  1900. */
  1901. class BrokerMetadata {
  1902. public:
  1903. /** @returns Broker id */
  1904. virtual int32_t id() const = 0;
  1905. /** @returns Broker hostname */
  1906. virtual const std::string host() const = 0;
  1907. /** @returns Broker listening port */
  1908. virtual int port() const = 0;
  1909. virtual ~BrokerMetadata() = 0;
  1910. };
  1911. /**
  1912. * @brief Metadata: Partition information
  1913. */
  1914. class PartitionMetadata {
  1915. public:
  1916. /** @brief Replicas */
  1917. typedef std::vector<int32_t> ReplicasVector;
  1918. /** @brief ISRs (In-Sync-Replicas) */
  1919. typedef std::vector<int32_t> ISRSVector;
  1920. /** @brief Replicas iterator */
  1921. typedef ReplicasVector::const_iterator ReplicasIterator;
  1922. /** @brief ISRs iterator */
  1923. typedef ISRSVector::const_iterator ISRSIterator;
  1924. /** @returns Partition id */
  1925. virtual int32_t id() const = 0;
  1926. /** @returns Partition error reported by broker */
  1927. virtual ErrorCode err() const = 0;
  1928. /** @returns Leader broker (id) for partition */
  1929. virtual int32_t leader() const = 0;
  1930. /** @returns Replica brokers */
  1931. virtual const std::vector<int32_t> *replicas() const = 0;
  1932. /** @returns In-Sync-Replica brokers
  1933. * @warning The broker may return a cached/outdated list of ISRs.
  1934. */
  1935. virtual const std::vector<int32_t> *isrs() const = 0;
  1936. virtual ~PartitionMetadata() = 0;
  1937. };
  1938. /**
  1939. * @brief Metadata: Topic information
  1940. */
  1941. class TopicMetadata {
  1942. public:
  1943. /** @brief Partitions */
  1944. typedef std::vector<const PartitionMetadata*> PartitionMetadataVector;
  1945. /** @brief Partitions iterator */
  1946. typedef PartitionMetadataVector::const_iterator PartitionMetadataIterator;
  1947. /** @returns Topic name */
  1948. virtual const std::string topic() const = 0;
  1949. /** @returns Partition list */
  1950. virtual const PartitionMetadataVector *partitions() const = 0;
  1951. /** @returns Topic error reported by broker */
  1952. virtual ErrorCode err() const = 0;
  1953. virtual ~TopicMetadata() = 0;
  1954. };
  1955. /**
  1956. * @brief Metadata container
  1957. */
  1958. class Metadata {
  1959. public:
  1960. /** @brief Brokers */
  1961. typedef std::vector<const BrokerMetadata*> BrokerMetadataVector;
  1962. /** @brief Topics */
  1963. typedef std::vector<const TopicMetadata*> TopicMetadataVector;
  1964. /** @brief Brokers iterator */
  1965. typedef BrokerMetadataVector::const_iterator BrokerMetadataIterator;
  1966. /** @brief Topics iterator */
  1967. typedef TopicMetadataVector::const_iterator TopicMetadataIterator;
  1968. /** @brief Broker list */
  1969. virtual const BrokerMetadataVector *brokers() const = 0;
  1970. /** @brief Topic list */
  1971. virtual const TopicMetadataVector *topics() const = 0;
  1972. /** @brief Broker (id) originating this metadata */
  1973. virtual int32_t orig_broker_id() const = 0;
  1974. /** @brief Broker (name) originating this metadata */
  1975. virtual const std::string orig_broker_name() const = 0;
  1976. virtual ~Metadata() = 0;
  1977. };
  1978. /**@}*/
  1979. }
  1980. #endif /* _RDKAFKACPP_H_ */