123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630 |
- /*
- * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
- *
- * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
- *
- * Use of this source code is governed by MIT license that can be found in the
- * LICENSE file in the root of the source tree. All contributing project authors
- * may be found in the AUTHORS file in the root of the source tree.
- */
- #ifndef NETWORK_SOCKET_H
- #define NETWORK_SOCKET_H
- #include <memory>
- #include <string>
- #include <deque>
- #include <mutex>
- #include <vector>
- #include <atomic>
- #include <sstream>
- #include <functional>
- #include "Util/util.h"
- #include "Util/onceToken.h"
- #include "Util/uv_errno.h"
- #include "Util/TimeTicker.h"
- #include "Util/ResourcePool.h"
- #include "Poller/Timer.h"
- #include "Poller/EventPoller.h"
- #include "Network/sockutil.h"
- #include "Buffer.h"
- using namespace std;
- namespace toolkit {
- #if defined(MSG_NOSIGNAL)
- #define FLAG_NOSIGNAL MSG_NOSIGNAL
- #else
- #define FLAG_NOSIGNAL 0
- #endif //MSG_NOSIGNAL
- #if defined(MSG_MORE)
- #define FLAG_MORE MSG_MORE
- #else
- #define FLAG_MORE 0
- #endif //MSG_MORE
- #if defined(MSG_DONTWAIT)
- #define FLAG_DONTWAIT MSG_DONTWAIT
- #else
- #define FLAG_DONTWAIT 0
- #endif //MSG_DONTWAIT
- //默认的socket flags:不触发SIGPIPE,非阻塞发送
- #define SOCKET_DEFAULE_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT )
-
- //发送超时时间,如果在规定时间内一直没有发送数据成功,那么将触发onErr事件
- #define SEND_TIME_OUT_SEC 10
-
- //错误类型枚举
- typedef enum {
- Err_success = 0, //成功
- Err_eof, //eof
- Err_timeout, //超时
- Err_refused,//连接被拒绝
- Err_dns,//dns解析失败
- Err_shutdown,//主动关闭
- Err_other = 0xFF,//其他错误
- } ErrCode;
- //错误信息类
- class SockException: public std::exception {
- public:
- SockException(ErrCode errCode = Err_success,
- const string &errMsg = "",
- int customCode = 0) {
- _errMsg = errMsg;
- _errCode = errCode;
- _customCode = customCode;
- }
- //重置错误
- void reset(ErrCode errCode, const string &errMsg) {
- _errMsg = errMsg;
- _errCode = errCode;
- }
- //错误提示
- const char* what() const noexcept override{
- return _errMsg.c_str();
- }
- //错误代码
- ErrCode getErrCode() const {
- return _errCode;
- }
- //判断是否真的有错
- operator bool() const{
- return _errCode != Err_success;
- }
- //用户自定义错误代码
- int getCustomCode () const{
- return _customCode;
- }
- //获取用户自定义错误代码
- void setCustomCode(int code) {
- _customCode = code;
- };
- private:
- string _errMsg;
- ErrCode _errCode;
- int _customCode = 0;
- };
- class SockNum{
- public:
- typedef enum {
- Sock_TCP = 0,
- Sock_UDP = 1
- } SockType;
- typedef std::shared_ptr<SockNum> Ptr;
- SockNum(int fd,SockType type){
- _fd = fd;
- _type = type;
- }
- ~SockNum(){
- #if defined (OS_IPHONE)
- unsetSocketOfIOS(_fd);
- #endif //OS_IPHONE
- ::shutdown(_fd, SHUT_RDWR);
- close(_fd);
- }
- int rawFd() const{
- return _fd;
- }
- SockType type(){
- return _type;
- }
- void setConnected(){
- #if defined (OS_IPHONE)
- setSocketOfIOS(_fd);
- #endif //OS_IPHONE
- }
- private:
- SockType _type;
- int _fd;
- #if defined (OS_IPHONE)
- void *readStream=NULL;
- void *writeStream=NULL;
- bool setSocketOfIOS(int socket);
- void unsetSocketOfIOS(int socket);
- #endif //OS_IPHONE
- };
- //socket 文件描述符的包装
- //在析构时自动溢出监听并close套接字
- //防止描述符溢出
- class SockFD : public noncopyable {
- public:
- typedef std::shared_ptr<SockFD> Ptr;
- /**
- * 创建一个fd对象
- * @param num 文件描述符,int数字
- * @param poller 事件监听器
- */
- SockFD(int num,SockNum::SockType type,const EventPoller::Ptr &poller){
- _num = std::make_shared<SockNum>(num,type);
- _poller = poller;
- }
- /**
- * 复制一个fd对象
- * @param that 源对象
- * @param poller 事件监听器
- */
- SockFD(const SockFD &that,const EventPoller::Ptr &poller){
- _num = that._num;
- _poller = poller;
- if(_poller == that._poller){
- throw invalid_argument("copy a SockFD with same poller!");
- }
- }
- ~SockFD() {
- auto num = _num;
- _poller->delEvent(_num->rawFd(), [num](bool) {});
- }
- void setConnected() {
- _num->setConnected();
- }
- int rawFd() const {
- return _num->rawFd();
- }
- SockNum::SockType type() {
- return _num->type();
- }
- private:
- SockNum::Ptr _num;
- EventPoller::Ptr _poller;
- };
- template <class Mtx = recursive_mutex>
- class MutexWrapper {
- public:
- MutexWrapper(bool enable){
- _enable = enable;
- }
- ~MutexWrapper(){}
- inline void lock(){
- if(_enable){
- _mtx.lock();
- }
- }
- inline void unlock(){
- if(_enable){
- _mtx.unlock();
- }
- }
- private:
- bool _enable;
- Mtx _mtx;
- };
- class SockInfo {
- public:
- SockInfo() = default;
- virtual ~SockInfo() = default;
- //获取本机ip
- virtual string get_local_ip() = 0;
- //获取本机端口号
- virtual uint16_t get_local_port() = 0;
- //获取对方ip
- virtual string get_peer_ip() = 0;
- //获取对方端口号
- virtual uint16_t get_peer_port() = 0;
- //获取标识符
- virtual string getIdentifier() const { return ""; }
- };
- #define TraceP(ptr) TraceL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
- #define DebugP(ptr) DebugL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
- #define InfoP(ptr) InfoL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
- #define WarnP(ptr) WarnL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
- #define ErrorP(ptr) ErrorL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
- //异步IO Socket对象,包括tcp客户端、服务器和udp套接字
- class Socket : public std::enable_shared_from_this<Socket>, public noncopyable, public SockInfo {
- public:
- using Ptr = std::shared_ptr<Socket>;
- //接收数据回调
- using onReadCB = function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
- //发生错误回调
- using onErrCB = function<void(const SockException &err)>;
- //tcp监听接收到连接请求
- using onAcceptCB = function<void(Socket::Ptr &sock, shared_ptr<void> &complete)>;
- //socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止
- using onFlush = function<bool()>;
- //在接收到连接请求前,拦截Socket默认生成方式
- using onCreateSocket = function<Ptr(const EventPoller::Ptr &poller)>;
- //发送buffer成功与否回调
- using onSendResult = BufferList::SendResult;
- /**
- * 构造socket对象,尚未有实质操作
- * @param poller 绑定的poller线程
- * @param enable_mutex 是否启用互斥锁(接口是否线程安全)
- */
- static Ptr createSocket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
- Socket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
- ~Socket() override;
- /**
- * 创建tcp客户端并异步连接服务器
- * @param url 目标服务器ip或域名
- * @param port 目标服务器端口
- * @param con_cb 结果回调
- * @param timeout_sec 超时时间
- * @param local_ip 绑定本地网卡ip
- * @param local_port 绑定本地网卡端口号
- */
- virtual void connect(const string &url, uint16_t port, onErrCB con_cb, float timeout_sec = 5,
- const string &local_ip = "0.0.0.0", uint16_t local_port = 0);
- /**
- * 创建tcp监听服务器
- * @param port 监听端口,0则随机
- * @param local_ip 监听的网卡ip
- * @param backlog tcp最大积压数
- * @return 是否成功
- */
- virtual bool listen(uint16_t port, const string &local_ip = "0.0.0.0", int backlog = 1024);
- /**
- * 创建udp套接字,udp是无连接的,所以可以作为服务器和客户端
- * @param port 绑定的端口为0则随机
- * @param local_ip 绑定的网卡ip
- * @return 是否成功
- */
- virtual bool bindUdpSock(uint16_t port, const string &local_ip = "0.0.0.0");
- ////////////设置事件回调////////////
- /**
- * 设置数据接收回调,tcp或udp客户端有效
- * @param cb 回调对象
- */
- virtual void setOnRead(onReadCB cb);
- /**
- * 设置异常事件(包括eof等)回调
- * @param cb 回调对象
- */
- virtual void setOnErr(onErrCB cb);
- /**
- * 设置tcp监听接收到连接回调
- * @param cb 回调对象
- */
- virtual void setOnAccept(onAcceptCB cb);
- /**
- * 设置socket写缓存清空事件回调
- * 通过该回调可以实现发送流控
- * @param cb 回调对象
- */
- virtual void setOnFlush(onFlush cb);
- /**
- * 设置accept时,socket构造事件回调
- * @param cb 回调
- */
- virtual void setOnBeforeAccept(onCreateSocket cb);
- /**
- * 设置发送buffer结果回调
- * @param cb 回调
- */
- virtual void setOnSendResult(onSendResult cb);
- ////////////发送数据相关接口////////////
- /**
- * 发送数据指针
- * @param buf 数据指针
- * @param size 数据长度
- * @param addr 目标地址
- * @param addr_len 目标地址长度
- * @param try_flush 是否尝试写socket
- * @return -1代表失败(socket无效),0代表数据长度为0,否则返回数据长度
- */
- ssize_t send(const char *buf, size_t size = 0, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
- /**
- * 发送string
- */
- ssize_t send(string buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
- /**
- * 发送Buffer对象,Socket对象发送数据的统一出口
- * socket对象发送数据的统一出口
- */
- virtual ssize_t send(Buffer::Ptr buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
- /**
- * 关闭socket且触发onErr回调,onErr回调将在poller线程中进行
- * @param err 错误原因
- * @return 是否成功触发onErr回调
- */
- virtual bool emitErr(const SockException &err) noexcept;
- /**
- * 关闭或开启数据接收
- * @param enabled 是否开启
- */
- virtual void enableRecv(bool enabled);
- /**
- * 获取裸文件描述符,请勿进行close操作(因为Socket对象会管理其生命周期)
- * @return 文件描述符
- */
- virtual int rawFD() const;
- /**
- * 设置发送超时主动断开时间;默认10秒
- * @param second 发送超时数据,单位秒
- */
- virtual void setSendTimeOutSecond(uint32_t second);
- /**
- * 套接字是否忙,如果套接字写缓存已满则返回true
- * @return 套接字是否忙
- */
- virtual bool isSocketBusy() const;
- /**
- * 获取poller线程对象
- * @return poller线程对象
- */
- virtual const EventPoller::Ptr &getPoller() const;
- /**
- * 从另外一个Socket克隆
- * 目的是一个socket可以被多个poller对象监听,提高性能
- * @param other 原始的socket对象
- * @return 是否成功
- */
- virtual bool cloneFromListenSocket(const Socket &other);
- /**
- * 绑定udp 目标地址,后续发送时就不用再单独指定了
- * @param dst_addr 目标地址
- * @param addr_len 目标地址长度
- * @return 是否成功
- */
- virtual bool bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len = 0);
- /**
- * 设置发送flags
- * @param flags 发送的flag
- */
- virtual void setSendFlags(int flags = SOCKET_DEFAULE_FLAGS);
- /**
- * 关闭套接字
- */
- virtual void closeSock();
- /**
- * 获取发送缓存包个数(不是字节数)
- */
- virtual size_t getSendBufferCount();
- /**
- * 获取上次socket发送缓存清空至今的毫秒数,单位毫秒
- */
- virtual uint64_t elapsedTimeAfterFlushed();
- ////////////SockInfo override////////////
- string get_local_ip() override;
- uint16_t get_local_port() override;
- string get_peer_ip() override;
- uint16_t get_peer_port() override;
- string getIdentifier() const override;
- private:
- SockFD::Ptr setPeerSock(int fd);
- SockFD::Ptr makeSock(int sock, SockNum::SockType type);
- int onAccept(const SockFD::Ptr &sock, int event) noexcept;
- ssize_t onRead(const SockFD::Ptr &sock, bool is_udp = false) noexcept;
- void onWriteAble(const SockFD::Ptr &sock);
- void onConnected(const SockFD::Ptr &sock, const onErrCB &cb);
- void onFlushed(const SockFD::Ptr &pSock);
- void startWriteAbleEvent(const SockFD::Ptr &sock);
- void stopWriteAbleEvent(const SockFD::Ptr &sock);
- bool listen(const SockFD::Ptr &sock);
- bool flushData(const SockFD::Ptr &sock, bool poller_thread);
- bool attachEvent(const SockFD::Ptr &sock, bool is_udp = false);
- ssize_t send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush = true);
- private:
- //send socket时的flag
- int _sock_flags = SOCKET_DEFAULE_FLAGS;
- //最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
- uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000;
- //控制是否接收监听socket可读事件,关闭后可用于流量控制
- atomic<bool> _enable_recv {true};
- //标记该socket是否可写,socket写缓存满了就不可写
- atomic<bool> _sendable {true};
- //tcp连接超时定时器
- Timer::Ptr _con_timer;
- //tcp连接结果回调对象
- std::shared_ptr<function<void(int)> > _async_con_cb;
- //记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
- Ticker _send_flush_ticker;
- //复用的socket读缓存,每次read socket后,数据存放在此
- BufferRaw::Ptr _read_buffer;
- //socket fd的抽象类
- SockFD::Ptr _sock_fd;
- //本socket绑定的poller线程,事件触发于此线程
- EventPoller::Ptr _poller;
- //跨线程访问_sock_fd时需要上锁
- mutable MutexWrapper<recursive_mutex> _mtx_sock_fd;
- //socket异常事件(比如说断开)
- onErrCB _on_err;
- //收到数据事件
- onReadCB _on_read;
- //socket缓存清空事件(可用于发送流速控制)
- onFlush _on_flush;
- //tcp监听收到accept请求事件
- onAcceptCB _on_accept;
- //tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
- onCreateSocket _on_before_accept;
- //设置上述回调函数的锁
- MutexWrapper<recursive_mutex> _mtx_event;
- //一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
- List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
- //一级发送缓存锁
- MutexWrapper<recursive_mutex> _mtx_send_buf_waiting;
- //二级发送缓存, socket可写时,会把二级缓存批量写入到socket
- List<BufferList::Ptr> _send_buf_sending;
- //二级发送缓存锁
- MutexWrapper<recursive_mutex> _mtx_send_buf_sending;
- //发送buffer结果回调
- BufferList::SendResult _send_result;
- //对象个数统计
- ObjectStatistic<Socket> _statistic;
- };
- class SockSender {
- public:
- SockSender() = default;
- virtual ~SockSender() = default;
- virtual ssize_t send(Buffer::Ptr buf) = 0;
- virtual void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) = 0;
- //发送char *
- SockSender &operator << (const char *buf);
- //发送字符串
- SockSender &operator << (string buf);
- //发送Buffer对象
- SockSender &operator << (Buffer::Ptr buf);
- //发送其他类型是数据
- template<typename T>
- SockSender &operator << (T &&buf) {
- ostringstream ss;
- ss << std::forward<T>(buf);
- send(ss.str());
- return *this;
- }
- ssize_t send(string buf);
- ssize_t send(const char *buf, size_t size = 0);
- };
- //Socket对象的包装类
- class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface {
- public:
- SocketHelper(const Socket::Ptr &sock);
- ~SocketHelper() override;
- ///////////////////// Socket util functions /////////////////////
- /**
- * 获取poller线程
- */
- const EventPoller::Ptr& getPoller() const;
- /**
- * 设置批量发送标记,用于提升性能
- * @param try_flush 批量发送标记
- */
- void setSendFlushFlag(bool try_flush);
- /**
- * 设置socket发送flags
- * @param flags socket发送flags
- */
- void setSendFlags(int flags);
- /**
- * 套接字是否忙,如果套接字写缓存已满则返回true
- */
- bool isSocketBusy() const;
- /**
- * 设置Socket创建器,自定义Socket创建方式
- * @param cb 创建器
- */
- void setOnCreateSocket(Socket::onCreateSocket cb);
- /**
- * 创建socket对象
- */
- Socket::Ptr createSocket();
- ///////////////////// SockInfo override /////////////////////
- string get_local_ip() override;
- uint16_t get_local_port() override;
- string get_peer_ip() override;
- uint16_t get_peer_port() override;
- ///////////////////// TaskExecutorInterface override /////////////////////
- /**
- * 任务切换到所属poller线程执行
- * @param task 任务
- * @param may_sync 是否运行同步执行任务
- */
- Task::Ptr async(TaskIn task, bool may_sync = true) override;
- Task::Ptr async_first(TaskIn task, bool may_sync = true) override;
- ///////////////////// SockSender override /////////////////////
- /**
- * 统一发送数据的出口
- */
- ssize_t send(Buffer::Ptr buf) override;
- /**
- * 触发onErr事件
- */
- void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) override;
- protected:
- void setPoller(const EventPoller::Ptr &poller);
- void setSock(const Socket::Ptr &sock);
- const Socket::Ptr& getSock() const;
- private:
- bool _try_flush = true;
- uint16_t _peer_port = 0;
- uint16_t _local_port = 0;
- string _peer_ip;
- string _local_ip;
- Socket::Ptr _sock;
- EventPoller::Ptr _poller;
- Socket::onCreateSocket _on_create_socket;
- };
- } // namespace toolkit
- #endif /* NETWORK_SOCKET_H */
|