Socket.h 18 KB


  1. /*
  2. * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
  3. *
  4. * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
  5. *
  6. * Use of this source code is governed by MIT license that can be found in the
  7. * LICENSE file in the root of the source tree. All contributing project authors
  8. * may be found in the AUTHORS file in the root of the source tree.
  9. */
  10. #ifndef NETWORK_SOCKET_H
  11. #define NETWORK_SOCKET_H
  12. #include <memory>
  13. #include <string>
  14. #include <deque>
  15. #include <mutex>
  16. #include <vector>
  17. #include <atomic>
  18. #include <sstream>
  19. #include <functional>
  20. #include "Util/util.h"
  21. #include "Util/onceToken.h"
  22. #include "Util/uv_errno.h"
  23. #include "Util/TimeTicker.h"
  24. #include "Util/ResourcePool.h"
  25. #include "Poller/Timer.h"
  26. #include "Poller/EventPoller.h"
  27. #include "Network/sockutil.h"
  28. #include "Buffer.h"
  29. using namespace std;
  30. namespace toolkit {
  31. #if defined(MSG_NOSIGNAL)
  32. #define FLAG_NOSIGNAL MSG_NOSIGNAL
  33. #else
  34. #define FLAG_NOSIGNAL 0
  35. #endif //MSG_NOSIGNAL
  36. #if defined(MSG_MORE)
  37. #define FLAG_MORE MSG_MORE
  38. #else
  39. #define FLAG_MORE 0
  40. #endif //MSG_MORE
  41. #if defined(MSG_DONTWAIT)
  42. #define FLAG_DONTWAIT MSG_DONTWAIT
  43. #else
  44. #define FLAG_DONTWAIT 0
  45. #endif //MSG_DONTWAIT
  46. //默认的socket flags:不触发SIGPIPE,非阻塞发送
  47. #define SOCKET_DEFAULE_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT )
  48. //发送超时时间,如果在规定时间内一直没有发送数据成功,那么将触发onErr事件
  49. #define SEND_TIME_OUT_SEC 10
  50. //错误类型枚举
  51. typedef enum {
  52. Err_success = 0, //成功
  53. Err_eof, //eof
  54. Err_timeout, //超时
  55. Err_refused,//连接被拒绝
  56. Err_dns,//dns解析失败
  57. Err_shutdown,//主动关闭
  58. Err_other = 0xFF,//其他错误
  59. } ErrCode;
  60. //错误信息类
  61. class SockException: public std::exception {
  62. public:
  63. SockException(ErrCode errCode = Err_success,
  64. const string &errMsg = "",
  65. int customCode = 0) {
  66. _errMsg = errMsg;
  67. _errCode = errCode;
  68. _customCode = customCode;
  69. }
  70. //重置错误
  71. void reset(ErrCode errCode, const string &errMsg) {
  72. _errMsg = errMsg;
  73. _errCode = errCode;
  74. }
  75. //错误提示
  76. const char* what() const noexcept override{
  77. return _errMsg.c_str();
  78. }
  79. //错误代码
  80. ErrCode getErrCode() const {
  81. return _errCode;
  82. }
  83. //判断是否真的有错
  84. operator bool() const{
  85. return _errCode != Err_success;
  86. }
  87. //用户自定义错误代码
  88. int getCustomCode () const{
  89. return _customCode;
  90. }
  91. //获取用户自定义错误代码
  92. void setCustomCode(int code) {
  93. _customCode = code;
  94. };
  95. private:
  96. string _errMsg;
  97. ErrCode _errCode;
  98. int _customCode = 0;
  99. };
  100. class SockNum{
  101. public:
  102. typedef enum {
  103. Sock_TCP = 0,
  104. Sock_UDP = 1
  105. } SockType;
  106. typedef std::shared_ptr<SockNum> Ptr;
  107. SockNum(int fd,SockType type){
  108. _fd = fd;
  109. _type = type;
  110. }
  111. ~SockNum(){
  112. #if defined (OS_IPHONE)
  113. unsetSocketOfIOS(_fd);
  114. #endif //OS_IPHONE
  115. ::shutdown(_fd, SHUT_RDWR);
  116. close(_fd);
  117. }
  118. int rawFd() const{
  119. return _fd;
  120. }
  121. SockType type(){
  122. return _type;
  123. }
  124. void setConnected(){
  125. #if defined (OS_IPHONE)
  126. setSocketOfIOS(_fd);
  127. #endif //OS_IPHONE
  128. }
  129. private:
  130. SockType _type;
  131. int _fd;
  132. #if defined (OS_IPHONE)
  133. void *readStream=NULL;
  134. void *writeStream=NULL;
  135. bool setSocketOfIOS(int socket);
  136. void unsetSocketOfIOS(int socket);
  137. #endif //OS_IPHONE
  138. };
  139. //socket 文件描述符的包装
  140. //在析构时自动溢出监听并close套接字
  141. //防止描述符溢出
  142. class SockFD : public noncopyable {
  143. public:
  144. typedef std::shared_ptr<SockFD> Ptr;
  145. /**
  146. * 创建一个fd对象
  147. * @param num 文件描述符,int数字
  148. * @param poller 事件监听器
  149. */
  150. SockFD(int num,SockNum::SockType type,const EventPoller::Ptr &poller){
  151. _num = std::make_shared<SockNum>(num,type);
  152. _poller = poller;
  153. }
  154. /**
  155. * 复制一个fd对象
  156. * @param that 源对象
  157. * @param poller 事件监听器
  158. */
  159. SockFD(const SockFD &that,const EventPoller::Ptr &poller){
  160. _num = that._num;
  161. _poller = poller;
  162. if(_poller == that._poller){
  163. throw invalid_argument("copy a SockFD with same poller!");
  164. }
  165. }
  166. ~SockFD() {
  167. auto num = _num;
  168. _poller->delEvent(_num->rawFd(), [num](bool) {});
  169. }
  170. void setConnected() {
  171. _num->setConnected();
  172. }
  173. int rawFd() const {
  174. return _num->rawFd();
  175. }
  176. SockNum::SockType type() {
  177. return _num->type();
  178. }
  179. private:
  180. SockNum::Ptr _num;
  181. EventPoller::Ptr _poller;
  182. };
  183. template <class Mtx = recursive_mutex>
  184. class MutexWrapper {
  185. public:
  186. MutexWrapper(bool enable){
  187. _enable = enable;
  188. }
  189. ~MutexWrapper(){}
  190. inline void lock(){
  191. if(_enable){
  192. _mtx.lock();
  193. }
  194. }
  195. inline void unlock(){
  196. if(_enable){
  197. _mtx.unlock();
  198. }
  199. }
  200. private:
  201. bool _enable;
  202. Mtx _mtx;
  203. };
  204. class SockInfo {
  205. public:
  206. SockInfo() = default;
  207. virtual ~SockInfo() = default;
  208. //获取本机ip
  209. virtual string get_local_ip() = 0;
  210. //获取本机端口号
  211. virtual uint16_t get_local_port() = 0;
  212. //获取对方ip
  213. virtual string get_peer_ip() = 0;
  214. //获取对方端口号
  215. virtual uint16_t get_peer_port() = 0;
  216. //获取标识符
  217. virtual string getIdentifier() const { return ""; }
  218. };
  219. #define TraceP(ptr) TraceL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
  220. #define DebugP(ptr) DebugL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
  221. #define InfoP(ptr) InfoL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
  222. #define WarnP(ptr) WarnL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
  223. #define ErrorP(ptr) ErrorL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
  224. //异步IO Socket对象,包括tcp客户端、服务器和udp套接字
  225. class Socket : public std::enable_shared_from_this<Socket>, public noncopyable, public SockInfo {
  226. public:
  227. using Ptr = std::shared_ptr<Socket>;
  228. //接收数据回调
  229. using onReadCB = function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
  230. //发生错误回调
  231. using onErrCB = function<void(const SockException &err)>;
  232. //tcp监听接收到连接请求
  233. using onAcceptCB = function<void(Socket::Ptr &sock, shared_ptr<void> &complete)>;
  234. //socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止
  235. using onFlush = function<bool()>;
  236. //在接收到连接请求前,拦截Socket默认生成方式
  237. using onCreateSocket = function<Ptr(const EventPoller::Ptr &poller)>;
  238. //发送buffer成功与否回调
  239. using onSendResult = BufferList::SendResult;
  240. /**
  241. * 构造socket对象,尚未有实质操作
  242. * @param poller 绑定的poller线程
  243. * @param enable_mutex 是否启用互斥锁(接口是否线程安全)
  244. */
  245. static Ptr createSocket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
  246. Socket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
  247. ~Socket() override;
  248. /**
  249. * 创建tcp客户端并异步连接服务器
  250. * @param url 目标服务器ip或域名
  251. * @param port 目标服务器端口
  252. * @param con_cb 结果回调
  253. * @param timeout_sec 超时时间
  254. * @param local_ip 绑定本地网卡ip
  255. * @param local_port 绑定本地网卡端口号
  256. */
  257. virtual void connect(const string &url, uint16_t port, onErrCB con_cb, float timeout_sec = 5,
  258. const string &local_ip = "0.0.0.0", uint16_t local_port = 0);
  259. /**
  260. * 创建tcp监听服务器
  261. * @param port 监听端口,0则随机
  262. * @param local_ip 监听的网卡ip
  263. * @param backlog tcp最大积压数
  264. * @return 是否成功
  265. */
  266. virtual bool listen(uint16_t port, const string &local_ip = "0.0.0.0", int backlog = 1024);
  267. /**
  268. * 创建udp套接字,udp是无连接的,所以可以作为服务器和客户端
  269. * @param port 绑定的端口为0则随机
  270. * @param local_ip 绑定的网卡ip
  271. * @return 是否成功
  272. */
  273. virtual bool bindUdpSock(uint16_t port, const string &local_ip = "0.0.0.0");
  274. ////////////设置事件回调////////////
  275. /**
  276. * 设置数据接收回调,tcp或udp客户端有效
  277. * @param cb 回调对象
  278. */
  279. virtual void setOnRead(onReadCB cb);
  280. /**
  281. * 设置异常事件(包括eof等)回调
  282. * @param cb 回调对象
  283. */
  284. virtual void setOnErr(onErrCB cb);
  285. /**
  286. * 设置tcp监听接收到连接回调
  287. * @param cb 回调对象
  288. */
  289. virtual void setOnAccept(onAcceptCB cb);
  290. /**
  291. * 设置socket写缓存清空事件回调
  292. * 通过该回调可以实现发送流控
  293. * @param cb 回调对象
  294. */
  295. virtual void setOnFlush(onFlush cb);
  296. /**
  297. * 设置accept时,socket构造事件回调
  298. * @param cb 回调
  299. */
  300. virtual void setOnBeforeAccept(onCreateSocket cb);
  301. /**
  302. * 设置发送buffer结果回调
  303. * @param cb 回调
  304. */
  305. virtual void setOnSendResult(onSendResult cb);
  306. ////////////发送数据相关接口////////////
  307. /**
  308. * 发送数据指针
  309. * @param buf 数据指针
  310. * @param size 数据长度
  311. * @param addr 目标地址
  312. * @param addr_len 目标地址长度
  313. * @param try_flush 是否尝试写socket
  314. * @return -1代表失败(socket无效),0代表数据长度为0,否则返回数据长度
  315. */
  316. ssize_t send(const char *buf, size_t size = 0, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
  317. /**
  318. * 发送string
  319. */
  320. ssize_t send(string buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
  321. /**
  322. * 发送Buffer对象,Socket对象发送数据的统一出口
  323. * socket对象发送数据的统一出口
  324. */
  325. virtual ssize_t send(Buffer::Ptr buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
  326. /**
  327. * 关闭socket且触发onErr回调,onErr回调将在poller线程中进行
  328. * @param err 错误原因
  329. * @return 是否成功触发onErr回调
  330. */
  331. virtual bool emitErr(const SockException &err) noexcept;
  332. /**
  333. * 关闭或开启数据接收
  334. * @param enabled 是否开启
  335. */
  336. virtual void enableRecv(bool enabled);
  337. /**
  338. * 获取裸文件描述符,请勿进行close操作(因为Socket对象会管理其生命周期)
  339. * @return 文件描述符
  340. */
  341. virtual int rawFD() const;
  342. /**
  343. * 设置发送超时主动断开时间;默认10秒
  344. * @param second 发送超时数据,单位秒
  345. */
  346. virtual void setSendTimeOutSecond(uint32_t second);
  347. /**
  348. * 套接字是否忙,如果套接字写缓存已满则返回true
  349. * @return 套接字是否忙
  350. */
  351. virtual bool isSocketBusy() const;
  352. /**
  353. * 获取poller线程对象
  354. * @return poller线程对象
  355. */
  356. virtual const EventPoller::Ptr &getPoller() const;
  357. /**
  358. * 从另外一个Socket克隆
  359. * 目的是一个socket可以被多个poller对象监听,提高性能
  360. * @param other 原始的socket对象
  361. * @return 是否成功
  362. */
  363. virtual bool cloneFromListenSocket(const Socket &other);
  364. /**
  365. * 绑定udp 目标地址,后续发送时就不用再单独指定了
  366. * @param dst_addr 目标地址
  367. * @param addr_len 目标地址长度
  368. * @return 是否成功
  369. */
  370. virtual bool bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len = 0);
  371. /**
  372. * 设置发送flags
  373. * @param flags 发送的flag
  374. */
  375. virtual void setSendFlags(int flags = SOCKET_DEFAULE_FLAGS);
  376. /**
  377. * 关闭套接字
  378. */
  379. virtual void closeSock();
  380. /**
  381. * 获取发送缓存包个数(不是字节数)
  382. */
  383. virtual size_t getSendBufferCount();
  384. /**
  385. * 获取上次socket发送缓存清空至今的毫秒数,单位毫秒
  386. */
  387. virtual uint64_t elapsedTimeAfterFlushed();
  388. ////////////SockInfo override////////////
  389. string get_local_ip() override;
  390. uint16_t get_local_port() override;
  391. string get_peer_ip() override;
  392. uint16_t get_peer_port() override;
  393. string getIdentifier() const override;
  394. private:
  395. SockFD::Ptr setPeerSock(int fd);
  396. SockFD::Ptr makeSock(int sock, SockNum::SockType type);
  397. int onAccept(const SockFD::Ptr &sock, int event) noexcept;
  398. ssize_t onRead(const SockFD::Ptr &sock, bool is_udp = false) noexcept;
  399. void onWriteAble(const SockFD::Ptr &sock);
  400. void onConnected(const SockFD::Ptr &sock, const onErrCB &cb);
  401. void onFlushed(const SockFD::Ptr &pSock);
  402. void startWriteAbleEvent(const SockFD::Ptr &sock);
  403. void stopWriteAbleEvent(const SockFD::Ptr &sock);
  404. bool listen(const SockFD::Ptr &sock);
  405. bool flushData(const SockFD::Ptr &sock, bool poller_thread);
  406. bool attachEvent(const SockFD::Ptr &sock, bool is_udp = false);
  407. ssize_t send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush = true);
  408. private:
  409. //send socket时的flag
  410. int _sock_flags = SOCKET_DEFAULE_FLAGS;
  411. //最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
  412. uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000;
  413. //控制是否接收监听socket可读事件,关闭后可用于流量控制
  414. atomic<bool> _enable_recv {true};
  415. //标记该socket是否可写,socket写缓存满了就不可写
  416. atomic<bool> _sendable {true};
  417. //tcp连接超时定时器
  418. Timer::Ptr _con_timer;
  419. //tcp连接结果回调对象
  420. std::shared_ptr<function<void(int)> > _async_con_cb;
  421. //记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
  422. Ticker _send_flush_ticker;
  423. //复用的socket读缓存,每次read socket后,数据存放在此
  424. BufferRaw::Ptr _read_buffer;
  425. //socket fd的抽象类
  426. SockFD::Ptr _sock_fd;
  427. //本socket绑定的poller线程,事件触发于此线程
  428. EventPoller::Ptr _poller;
  429. //跨线程访问_sock_fd时需要上锁
  430. mutable MutexWrapper<recursive_mutex> _mtx_sock_fd;
  431. //socket异常事件(比如说断开)
  432. onErrCB _on_err;
  433. //收到数据事件
  434. onReadCB _on_read;
  435. //socket缓存清空事件(可用于发送流速控制)
  436. onFlush _on_flush;
  437. //tcp监听收到accept请求事件
  438. onAcceptCB _on_accept;
  439. //tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
  440. onCreateSocket _on_before_accept;
  441. //设置上述回调函数的锁
  442. MutexWrapper<recursive_mutex> _mtx_event;
  443. //一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
  444. List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
  445. //一级发送缓存锁
  446. MutexWrapper<recursive_mutex> _mtx_send_buf_waiting;
  447. //二级发送缓存, socket可写时,会把二级缓存批量写入到socket
  448. List<BufferList::Ptr> _send_buf_sending;
  449. //二级发送缓存锁
  450. MutexWrapper<recursive_mutex> _mtx_send_buf_sending;
  451. //发送buffer结果回调
  452. BufferList::SendResult _send_result;
  453. //对象个数统计
  454. ObjectStatistic<Socket> _statistic;
  455. };
  456. class SockSender {
  457. public:
  458. SockSender() = default;
  459. virtual ~SockSender() = default;
  460. virtual ssize_t send(Buffer::Ptr buf) = 0;
  461. virtual void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) = 0;
  462. //发送char *
  463. SockSender &operator << (const char *buf);
  464. //发送字符串
  465. SockSender &operator << (string buf);
  466. //发送Buffer对象
  467. SockSender &operator << (Buffer::Ptr buf);
  468. //发送其他类型是数据
  469. template<typename T>
  470. SockSender &operator << (T &&buf) {
  471. ostringstream ss;
  472. ss << std::forward<T>(buf);
  473. send(ss.str());
  474. return *this;
  475. }
  476. ssize_t send(string buf);
  477. ssize_t send(const char *buf, size_t size = 0);
  478. };
  479. //Socket对象的包装类
  480. class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface {
  481. public:
  482. SocketHelper(const Socket::Ptr &sock);
  483. ~SocketHelper() override;
  484. ///////////////////// Socket util functions /////////////////////
  485. /**
  486. * 获取poller线程
  487. */
  488. const EventPoller::Ptr& getPoller() const;
  489. /**
  490. * 设置批量发送标记,用于提升性能
  491. * @param try_flush 批量发送标记
  492. */
  493. void setSendFlushFlag(bool try_flush);
  494. /**
  495. * 设置socket发送flags
  496. * @param flags socket发送flags
  497. */
  498. void setSendFlags(int flags);
  499. /**
  500. * 套接字是否忙,如果套接字写缓存已满则返回true
  501. */
  502. bool isSocketBusy() const;
  503. /**
  504. * 设置Socket创建器,自定义Socket创建方式
  505. * @param cb 创建器
  506. */
  507. void setOnCreateSocket(Socket::onCreateSocket cb);
  508. /**
  509. * 创建socket对象
  510. */
  511. Socket::Ptr createSocket();
  512. ///////////////////// SockInfo override /////////////////////
  513. string get_local_ip() override;
  514. uint16_t get_local_port() override;
  515. string get_peer_ip() override;
  516. uint16_t get_peer_port() override;
  517. ///////////////////// TaskExecutorInterface override /////////////////////
  518. /**
  519. * 任务切换到所属poller线程执行
  520. * @param task 任务
  521. * @param may_sync 是否运行同步执行任务
  522. */
  523. Task::Ptr async(TaskIn task, bool may_sync = true) override;
  524. Task::Ptr async_first(TaskIn task, bool may_sync = true) override;
  525. ///////////////////// SockSender override /////////////////////
  526. /**
  527. * 统一发送数据的出口
  528. */
  529. ssize_t send(Buffer::Ptr buf) override;
  530. /**
  531. * 触发onErr事件
  532. */
  533. void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) override;
  534. protected:
  535. void setPoller(const EventPoller::Ptr &poller);
  536. void setSock(const Socket::Ptr &sock);
  537. const Socket::Ptr& getSock() const;
  538. private:
  539. bool _try_flush = true;
  540. uint16_t _peer_port = 0;
  541. uint16_t _local_port = 0;
  542. string _peer_ip;
  543. string _local_ip;
  544. Socket::Ptr _sock;
  545. EventPoller::Ptr _poller;
  546. Socket::onCreateSocket _on_create_socket;
  547. };
  548. } // namespace toolkit
  549. #endif /* NETWORK_SOCKET_H */