|
@@ -1,7 +1,7 @@
|
|
|
/*
|
|
|
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
|
|
|
*
|
|
|
- * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
|
|
|
+ * This file is part of ZLToolKit(https://github.com/xia-chu/ZLToolKit).
|
|
|
*
|
|
|
* Use of this source code is governed by MIT license that can be found in the
|
|
|
* LICENSE file in the root of the source tree. All contributing project authors
|
|
@@ -13,15 +13,22 @@
|
|
|
|
|
|
#include <memory>
|
|
|
#include <string>
|
|
|
+#include <deque>
|
|
|
#include <mutex>
|
|
|
+#include <vector>
|
|
|
#include <atomic>
|
|
|
#include <sstream>
|
|
|
#include <functional>
|
|
|
-#include "Util/SpeedStatistic.h"
|
|
|
-#include "sockutil.h"
|
|
|
+#include "Util/util.h"
|
|
|
+#include "Util/onceToken.h"
|
|
|
+#include "Util/uv_errno.h"
|
|
|
+#include "Util/TimeTicker.h"
|
|
|
+#include "Util/ResourcePool.h"
|
|
|
#include "Poller/Timer.h"
|
|
|
#include "Poller/EventPoller.h"
|
|
|
-#include "BufferSock.h"
|
|
|
+#include "Network/sockutil.h"
|
|
|
+#include "Buffer.h"
|
|
|
+using namespace std;
|
|
|
|
|
|
namespace toolkit {
|
|
|
|
|
@@ -61,100 +68,89 @@ typedef enum {
|
|
|
} ErrCode;
|
|
|
|
|
|
//错误信息类
|
|
|
-class SockException : public std::exception {
|
|
|
+class SockException: public std::exception {
|
|
|
public:
|
|
|
- SockException(ErrCode code = Err_success, const std::string &msg = "", int custom_code = 0) {
|
|
|
- _msg = msg;
|
|
|
- _code = code;
|
|
|
- _custom_code = custom_code;
|
|
|
+ SockException(ErrCode errCode = Err_success,
|
|
|
+ const string &errMsg = "",
|
|
|
+ int customCode = 0) {
|
|
|
+ _errMsg = errMsg;
|
|
|
+ _errCode = errCode;
|
|
|
+ _customCode = customCode;
|
|
|
}
|
|
|
|
|
|
//重置错误
|
|
|
- void reset(ErrCode code, const std::string &msg, int custom_code = 0) {
|
|
|
- _msg = msg;
|
|
|
- _code = code;
|
|
|
- _custom_code = custom_code;
|
|
|
+ void reset(ErrCode errCode, const string &errMsg) {
|
|
|
+ _errMsg = errMsg;
|
|
|
+ _errCode = errCode;
|
|
|
}
|
|
|
-
|
|
|
//错误提示
|
|
|
- const char *what() const noexcept override {
|
|
|
- return _msg.c_str();
|
|
|
+ const char* what() const noexcept override{
|
|
|
+ return _errMsg.c_str();
|
|
|
}
|
|
|
-
|
|
|
//错误代码
|
|
|
ErrCode getErrCode() const {
|
|
|
- return _code;
|
|
|
- }
|
|
|
-
|
|
|
- //用户自定义错误代码
|
|
|
- int getCustomCode() const {
|
|
|
- return _custom_code;
|
|
|
+ return _errCode;
|
|
|
}
|
|
|
-
|
|
|
//判断是否真的有错
|
|
|
- operator bool() const {
|
|
|
- return _code != Err_success;
|
|
|
+ operator bool() const{
|
|
|
+ return _errCode != Err_success;
|
|
|
}
|
|
|
-
|
|
|
+ //用户自定义错误代码
|
|
|
+ int getCustomCode () const{
|
|
|
+ return _customCode;
|
|
|
+ }
|
|
|
+ //获取用户自定义错误代码
|
|
|
+ void setCustomCode(int code) {
|
|
|
+ _customCode = code;
|
|
|
+ };
|
|
|
private:
|
|
|
- ErrCode _code;
|
|
|
- int _custom_code = 0;
|
|
|
- std::string _msg;
|
|
|
+ string _errMsg;
|
|
|
+ ErrCode _errCode;
|
|
|
+ int _customCode = 0;
|
|
|
};
|
|
|
|
|
|
-//std::cout等输出流可以直接输出SockException对象
|
|
|
-std::ostream &operator<<(std::ostream &ost, const SockException &err);
|
|
|
-
|
|
|
-class SockNum {
|
|
|
+class SockNum{
|
|
|
public:
|
|
|
- using Ptr = std::shared_ptr<SockNum>;
|
|
|
-
|
|
|
typedef enum {
|
|
|
- Sock_Invalid = -1,
|
|
|
Sock_TCP = 0,
|
|
|
- Sock_UDP = 1,
|
|
|
- Sock_TCP_Server = 2
|
|
|
+ Sock_UDP = 1
|
|
|
} SockType;
|
|
|
|
|
|
- SockNum(int fd, SockType type) {
|
|
|
+ typedef std::shared_ptr<SockNum> Ptr;
|
|
|
+ SockNum(int fd,SockType type){
|
|
|
_fd = fd;
|
|
|
_type = type;
|
|
|
}
|
|
|
-
|
|
|
- ~SockNum() {
|
|
|
+ ~SockNum(){
|
|
|
#if defined (OS_IPHONE)
|
|
|
unsetSocketOfIOS(_fd);
|
|
|
#endif //OS_IPHONE
|
|
|
- // 停止socket收发能力
|
|
|
::shutdown(_fd, SHUT_RDWR);
|
|
|
close(_fd);
|
|
|
}
|
|
|
|
|
|
- int rawFd() const {
|
|
|
+ int rawFd() const{
|
|
|
return _fd;
|
|
|
}
|
|
|
|
|
|
- SockType type() {
|
|
|
+ SockType type(){
|
|
|
return _type;
|
|
|
}
|
|
|
|
|
|
- void setConnected() {
|
|
|
+ void setConnected(){
|
|
|
#if defined (OS_IPHONE)
|
|
|
setSocketOfIOS(_fd);
|
|
|
#endif //OS_IPHONE
|
|
|
}
|
|
|
-
|
|
|
-#if defined (OS_IPHONE)
|
|
|
private:
|
|
|
- void *readStream=nullptr;
|
|
|
- void *writeStream=nullptr;
|
|
|
+ SockType _type;
|
|
|
+ int _fd;
|
|
|
+#if defined (OS_IPHONE)
|
|
|
+ void *readStream=NULL;
|
|
|
+ void *writeStream=NULL;
|
|
|
bool setSocketOfIOS(int socket);
|
|
|
void unsetSocketOfIOS(int socket);
|
|
|
#endif //OS_IPHONE
|
|
|
-
|
|
|
-private:
|
|
|
- int _fd;
|
|
|
- SockType _type;
|
|
|
};
|
|
|
|
|
|
//socket 文件描述符的包装
|
|
@@ -162,15 +158,14 @@ private:
|
|
|
//防止描述符溢出
|
|
|
class SockFD : public noncopyable {
|
|
|
public:
|
|
|
- using Ptr = std::shared_ptr<SockFD>;
|
|
|
-
|
|
|
+ typedef std::shared_ptr<SockFD> Ptr;
|
|
|
/**
|
|
|
* 创建一个fd对象
|
|
|
* @param num 文件描述符,int数字
|
|
|
* @param poller 事件监听器
|
|
|
*/
|
|
|
- SockFD(int num, SockNum::SockType type, const EventPoller::Ptr &poller) {
|
|
|
- _num = std::make_shared<SockNum>(num, type);
|
|
|
+ SockFD(int num,SockNum::SockType type,const EventPoller::Ptr &poller){
|
|
|
+ _num = std::make_shared<SockNum>(num,type);
|
|
|
_poller = poller;
|
|
|
}
|
|
|
|
|
@@ -179,23 +174,17 @@ public:
|
|
|
* @param that 源对象
|
|
|
* @param poller 事件监听器
|
|
|
*/
|
|
|
- SockFD(const SockFD &that, const EventPoller::Ptr &poller) {
|
|
|
+ SockFD(const SockFD &that,const EventPoller::Ptr &poller){
|
|
|
_num = that._num;
|
|
|
_poller = poller;
|
|
|
- if (_poller == that._poller) {
|
|
|
- throw std::invalid_argument("Copy a SockFD with same poller");
|
|
|
+ if(_poller == that._poller){
|
|
|
+ throw invalid_argument("copy a SockFD with same poller!");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- ~SockFD() { delEvent(); }
|
|
|
-
|
|
|
- void delEvent() {
|
|
|
- if (_poller) {
|
|
|
- auto num = _num;
|
|
|
- // 移除io事件成功后再close fd
|
|
|
- _poller->delEvent(num->rawFd(), [num](bool) {});
|
|
|
- _poller = nullptr;
|
|
|
- }
|
|
|
+ ~SockFD() {
|
|
|
+ auto num = _num;
|
|
|
+ _poller->delEvent(_num->rawFd(), [num](bool) {});
|
|
|
}
|
|
|
|
|
|
void setConnected() {
|
|
@@ -215,27 +204,24 @@ private:
|
|
|
EventPoller::Ptr _poller;
|
|
|
};
|
|
|
|
|
|
-template<class Mtx = std::recursive_mutex>
|
|
|
+template <class Mtx = recursive_mutex>
|
|
|
class MutexWrapper {
|
|
|
public:
|
|
|
- MutexWrapper(bool enable) {
|
|
|
+ MutexWrapper(bool enable){
|
|
|
_enable = enable;
|
|
|
}
|
|
|
+ ~MutexWrapper(){}
|
|
|
|
|
|
- ~MutexWrapper() = default;
|
|
|
-
|
|
|
- inline void lock() {
|
|
|
- if (_enable) {
|
|
|
+ inline void lock(){
|
|
|
+ if(_enable){
|
|
|
_mtx.lock();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- inline void unlock() {
|
|
|
- if (_enable) {
|
|
|
+ inline void unlock(){
|
|
|
+ if(_enable){
|
|
|
_mtx.unlock();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
private:
|
|
|
bool _enable;
|
|
|
Mtx _mtx;
|
|
@@ -247,15 +233,15 @@ public:
|
|
|
virtual ~SockInfo() = default;
|
|
|
|
|
|
//获取本机ip
|
|
|
- virtual std::string get_local_ip() = 0;
|
|
|
+ virtual string get_local_ip() = 0;
|
|
|
//获取本机端口号
|
|
|
virtual uint16_t get_local_port() = 0;
|
|
|
//获取对方ip
|
|
|
- virtual std::string get_peer_ip() = 0;
|
|
|
+ virtual string get_peer_ip() = 0;
|
|
|
//获取对方端口号
|
|
|
virtual uint16_t get_peer_port() = 0;
|
|
|
//获取标识符
|
|
|
- virtual std::string getIdentifier() const { return ""; }
|
|
|
+ virtual string getIdentifier() const { return ""; }
|
|
|
};
|
|
|
|
|
|
#define TraceP(ptr) TraceL << ptr->getIdentifier() << "(" << ptr->get_peer_ip() << ":" << ptr->get_peer_port() << ") "
|
|
@@ -269,15 +255,15 @@ class Socket : public std::enable_shared_from_this<Socket>, public noncopyable,
|
|
|
public:
|
|
|
using Ptr = std::shared_ptr<Socket>;
|
|
|
//接收数据回调
|
|
|
- using onReadCB = std::function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
|
|
|
+ using onReadCB = function<void(const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len)>;
|
|
|
//发生错误回调
|
|
|
- using onErrCB = std::function<void(const SockException &err)>;
|
|
|
+ using onErrCB = function<void(const SockException &err)>;
|
|
|
//tcp监听接收到连接请求
|
|
|
- using onAcceptCB = std::function<void(Socket::Ptr &sock, std::shared_ptr<void> &complete)>;
|
|
|
+ using onAcceptCB = function<void(Socket::Ptr &sock, shared_ptr<void> &complete)>;
|
|
|
//socket发送缓存清空事件,返回true代表下次继续监听该事件,否则停止
|
|
|
- using onFlush = std::function<bool()>;
|
|
|
+ using onFlush = function<bool()>;
|
|
|
//在接收到连接请求前,拦截Socket默认生成方式
|
|
|
- using onCreateSocket = std::function<Ptr(const EventPoller::Ptr &poller)>;
|
|
|
+ using onCreateSocket = function<Ptr(const EventPoller::Ptr &poller)>;
|
|
|
//发送buffer成功与否回调
|
|
|
using onSendResult = BufferList::SendResult;
|
|
|
|
|
@@ -288,6 +274,7 @@ public:
|
|
|
*/
|
|
|
static Ptr createSocket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
|
|
|
Socket(const EventPoller::Ptr &poller = nullptr, bool enable_mutex = true);
|
|
|
+
|
|
|
~Socket() override;
|
|
|
|
|
|
/**
|
|
@@ -299,8 +286,8 @@ public:
|
|
|
* @param local_ip 绑定本地网卡ip
|
|
|
* @param local_port 绑定本地网卡端口号
|
|
|
*/
|
|
|
- void connect(const std::string &url, uint16_t port, const onErrCB &con_cb, float timeout_sec = 5, const std::string &local_ip = "::", uint16_t local_port = 0);
|
|
|
-
|
|
|
+ virtual void connect(const string &url, uint16_t port, onErrCB con_cb, float timeout_sec = 5,
|
|
|
+ const string &local_ip = "0.0.0.0", uint16_t local_port = 0);
|
|
|
/**
|
|
|
* 创建tcp监听服务器
|
|
|
* @param port 监听端口,0则随机
|
|
@@ -308,7 +295,7 @@ public:
|
|
|
* @param backlog tcp最大积压数
|
|
|
* @return 是否成功
|
|
|
*/
|
|
|
- bool listen(uint16_t port, const std::string &local_ip = "::", int backlog = 1024);
|
|
|
+ virtual bool listen(uint16_t port, const string &local_ip = "0.0.0.0", int backlog = 1024);
|
|
|
|
|
|
/**
|
|
|
* 创建udp套接字,udp是无连接的,所以可以作为服务器和客户端
|
|
@@ -316,22 +303,7 @@ public:
|
|
|
* @param local_ip 绑定的网卡ip
|
|
|
* @return 是否成功
|
|
|
*/
|
|
|
- bool bindUdpSock(uint16_t port, const std::string &local_ip = "::", bool enable_reuse = true);
|
|
|
-
|
|
|
- /**
|
|
|
- * 包装外部fd,本对象负责close fd
|
|
|
- * 内部会设置fd为NoBlocked,NoSigpipe,CloExec
|
|
|
- * 其他设置需要自行使用SockUtil进行设置
|
|
|
- */
|
|
|
- bool fromSock(int fd, SockNum::SockType type);
|
|
|
-
|
|
|
- /**
|
|
|
- * 从另外一个Socket克隆
|
|
|
- * 目的是一个socket可以被多个poller对象监听,提高性能或实现Socket归属线程的迁移
|
|
|
- * @param other 原始的socket对象
|
|
|
- * @return 是否成功
|
|
|
- */
|
|
|
- bool cloneSocket(const Socket &other);
|
|
|
+ virtual bool bindUdpSock(uint16_t port, const string &local_ip = "0.0.0.0");
|
|
|
|
|
|
////////////设置事件回调////////////
|
|
|
|
|
@@ -339,38 +311,38 @@ public:
|
|
|
* 设置数据接收回调,tcp或udp客户端有效
|
|
|
* @param cb 回调对象
|
|
|
*/
|
|
|
- void setOnRead(onReadCB cb);
|
|
|
+ virtual void setOnRead(onReadCB cb);
|
|
|
|
|
|
/**
|
|
|
* 设置异常事件(包括eof等)回调
|
|
|
* @param cb 回调对象
|
|
|
*/
|
|
|
- void setOnErr(onErrCB cb);
|
|
|
+ virtual void setOnErr(onErrCB cb);
|
|
|
|
|
|
/**
|
|
|
* 设置tcp监听接收到连接回调
|
|
|
* @param cb 回调对象
|
|
|
*/
|
|
|
- void setOnAccept(onAcceptCB cb);
|
|
|
+ virtual void setOnAccept(onAcceptCB cb);
|
|
|
|
|
|
/**
|
|
|
* 设置socket写缓存清空事件回调
|
|
|
* 通过该回调可以实现发送流控
|
|
|
* @param cb 回调对象
|
|
|
*/
|
|
|
- void setOnFlush(onFlush cb);
|
|
|
+ virtual void setOnFlush(onFlush cb);
|
|
|
|
|
|
/**
|
|
|
* 设置accept时,socket构造事件回调
|
|
|
* @param cb 回调
|
|
|
*/
|
|
|
- void setOnBeforeAccept(onCreateSocket cb);
|
|
|
+ virtual void setOnBeforeAccept(onCreateSocket cb);
|
|
|
|
|
|
/**
|
|
|
* 设置发送buffer结果回调
|
|
|
* @param cb 回调
|
|
|
*/
|
|
|
- void setOnSendResult(onSendResult cb);
|
|
|
+ virtual void setOnSendResult(onSendResult cb);
|
|
|
|
|
|
////////////发送数据相关接口////////////
|
|
|
|
|
@@ -388,67 +360,58 @@ public:
|
|
|
/**
|
|
|
* 发送string
|
|
|
*/
|
|
|
- ssize_t send(std::string buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
|
|
|
+ ssize_t send(string buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
|
|
|
|
|
|
/**
|
|
|
* 发送Buffer对象,Socket对象发送数据的统一出口
|
|
|
* socket对象发送数据的统一出口
|
|
|
*/
|
|
|
- ssize_t send(Buffer::Ptr buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
|
|
|
-
|
|
|
- /**
|
|
|
- * 尝试将所有数据写socket
|
|
|
- * @return -1代表失败(socket无效或者发送超时),0代表成功?
|
|
|
- */
|
|
|
- int flushAll();
|
|
|
+ virtual ssize_t send(Buffer::Ptr buf, struct sockaddr *addr = nullptr, socklen_t addr_len = 0, bool try_flush = true);
|
|
|
|
|
|
/**
|
|
|
* 关闭socket且触发onErr回调,onErr回调将在poller线程中进行
|
|
|
* @param err 错误原因
|
|
|
* @return 是否成功触发onErr回调
|
|
|
*/
|
|
|
- bool emitErr(const SockException &err) noexcept;
|
|
|
+ virtual bool emitErr(const SockException &err) noexcept;
|
|
|
|
|
|
/**
|
|
|
* 关闭或开启数据接收
|
|
|
* @param enabled 是否开启
|
|
|
*/
|
|
|
- void enableRecv(bool enabled);
|
|
|
+ virtual void enableRecv(bool enabled);
|
|
|
|
|
|
/**
|
|
|
* 获取裸文件描述符,请勿进行close操作(因为Socket对象会管理其生命周期)
|
|
|
* @return 文件描述符
|
|
|
*/
|
|
|
- int rawFD() const;
|
|
|
-
|
|
|
- /**
|
|
|
- * tcp客户端是否处于连接状态
|
|
|
- * 支持Sock_TCP类型socket
|
|
|
- */
|
|
|
- bool alive() const;
|
|
|
-
|
|
|
- /**
|
|
|
- * 返回socket类型
|
|
|
- */
|
|
|
- SockNum::SockType sockType() const;
|
|
|
+ virtual int rawFD() const;
|
|
|
|
|
|
/**
|
|
|
* 设置发送超时主动断开时间;默认10秒
|
|
|
* @param second 发送超时数据,单位秒
|
|
|
*/
|
|
|
- void setSendTimeOutSecond(uint32_t second);
|
|
|
+ virtual void setSendTimeOutSecond(uint32_t second);
|
|
|
|
|
|
/**
|
|
|
* 套接字是否忙,如果套接字写缓存已满则返回true
|
|
|
* @return 套接字是否忙
|
|
|
*/
|
|
|
- bool isSocketBusy() const;
|
|
|
+ virtual bool isSocketBusy() const;
|
|
|
|
|
|
/**
|
|
|
* 获取poller线程对象
|
|
|
* @return poller线程对象
|
|
|
*/
|
|
|
- const EventPoller::Ptr &getPoller() const;
|
|
|
+ virtual const EventPoller::Ptr &getPoller() const;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从另外一个Socket克隆
|
|
|
+ * 目的是一个socket可以被多个poller对象监听,提高性能
|
|
|
+ * @param other 原始的socket对象
|
|
|
+ * @return 是否成功
|
|
|
+ */
|
|
|
+ virtual bool cloneFromListenSocket(const Socket &other);
|
|
|
|
|
|
/**
|
|
|
* 绑定udp 目标地址,后续发送时就不用再单独指定了
|
|
@@ -456,64 +419,50 @@ public:
|
|
|
* @param addr_len 目标地址长度
|
|
|
* @return 是否成功
|
|
|
*/
|
|
|
- bool bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len = 0);
|
|
|
+ virtual bool bindPeerAddr(const struct sockaddr *dst_addr, socklen_t addr_len = 0);
|
|
|
|
|
|
/**
|
|
|
* 设置发送flags
|
|
|
* @param flags 发送的flag
|
|
|
*/
|
|
|
- void setSendFlags(int flags = SOCKET_DEFAULE_FLAGS);
|
|
|
+ virtual void setSendFlags(int flags = SOCKET_DEFAULE_FLAGS);
|
|
|
|
|
|
/**
|
|
|
* 关闭套接字
|
|
|
- * @param close_fd 是否关闭fd还是只移除io事件监听
|
|
|
*/
|
|
|
- void closeSock(bool close_fd = true);
|
|
|
+ virtual void closeSock();
|
|
|
|
|
|
/**
|
|
|
* 获取发送缓存包个数(不是字节数)
|
|
|
*/
|
|
|
- size_t getSendBufferCount();
|
|
|
+ virtual size_t getSendBufferCount();
|
|
|
|
|
|
/**
|
|
|
* 获取上次socket发送缓存清空至今的毫秒数,单位毫秒
|
|
|
*/
|
|
|
- uint64_t elapsedTimeAfterFlushed();
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取接收速率,单位bytes/s
|
|
|
- */
|
|
|
- int getRecvSpeed();
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取发送速率,单位bytes/s
|
|
|
- */
|
|
|
- int getSendSpeed();
|
|
|
+ virtual uint64_t elapsedTimeAfterFlushed();
|
|
|
|
|
|
////////////SockInfo override////////////
|
|
|
- std::string get_local_ip() override;
|
|
|
+ string get_local_ip() override;
|
|
|
uint16_t get_local_port() override;
|
|
|
- std::string get_peer_ip() override;
|
|
|
+ string get_peer_ip() override;
|
|
|
uint16_t get_peer_port() override;
|
|
|
- std::string getIdentifier() const override;
|
|
|
+ string getIdentifier() const override;
|
|
|
|
|
|
private:
|
|
|
- SockFD::Ptr cloneSockFD(const Socket &other);
|
|
|
+ SockFD::Ptr setPeerSock(int fd);
|
|
|
SockFD::Ptr makeSock(int sock, SockNum::SockType type);
|
|
|
- void setPeerSock(int fd, SockNum::SockType type);
|
|
|
- int onAccept(int sock, int event) noexcept;
|
|
|
- ssize_t onRead(int sock, SockNum::SockType type, const BufferRaw::Ptr &buffer) noexcept;
|
|
|
- void onWriteAble(int sock, SockNum::SockType type);
|
|
|
- void onConnected(int sock, const onErrCB &cb);
|
|
|
- void onFlushed();
|
|
|
- void startWriteAbleEvent(int sock);
|
|
|
- void stopWriteAbleEvent(int sock);
|
|
|
+ int onAccept(const SockFD::Ptr &sock, int event) noexcept;
|
|
|
+ ssize_t onRead(const SockFD::Ptr &sock, bool is_udp = false) noexcept;
|
|
|
+ void onWriteAble(const SockFD::Ptr &sock);
|
|
|
+ void onConnected(const SockFD::Ptr &sock, const onErrCB &cb);
|
|
|
+ void onFlushed(const SockFD::Ptr &pSock);
|
|
|
+ void startWriteAbleEvent(const SockFD::Ptr &sock);
|
|
|
+ void stopWriteAbleEvent(const SockFD::Ptr &sock);
|
|
|
bool listen(const SockFD::Ptr &sock);
|
|
|
- bool flushData(int sock, SockNum::SockType type, bool poller_thread);
|
|
|
- bool attachEvent(int sock, SockNum::SockType type);
|
|
|
+ bool flushData(const SockFD::Ptr &sock, bool poller_thread);
|
|
|
+ bool attachEvent(const SockFD::Ptr &sock, bool is_udp = false);
|
|
|
ssize_t send_l(Buffer::Ptr buf, bool is_buf_sock, bool try_flush = true);
|
|
|
- void connect_l(const std::string &url, uint16_t port, const onErrCB &con_cb_in, float timeout_sec, const std::string &local_ip, uint16_t local_port);
|
|
|
- bool fromSock_l(int fd, SockNum::SockType type);
|
|
|
|
|
|
private:
|
|
|
//send socket时的flag
|
|
@@ -521,31 +470,25 @@ private:
|
|
|
//最大发送缓存,单位毫秒,距上次发送缓存清空时间不能超过该参数
|
|
|
uint32_t _max_send_buffer_ms = SEND_TIME_OUT_SEC * 1000;
|
|
|
//控制是否接收监听socket可读事件,关闭后可用于流量控制
|
|
|
- std::atomic<bool> _enable_recv {true};
|
|
|
+ atomic<bool> _enable_recv {true};
|
|
|
//标记该socket是否可写,socket写缓存满了就不可写
|
|
|
- std::atomic<bool> _sendable {true};
|
|
|
- //是否已经触发err回调了
|
|
|
- bool _err_emit = false;
|
|
|
- //是否启用网速统计
|
|
|
- bool _enable_speed = false;
|
|
|
- //接收速率统计
|
|
|
- BytesSpeed _recv_speed;
|
|
|
- //发送速率统计
|
|
|
- BytesSpeed _send_speed;
|
|
|
+ atomic<bool> _sendable {true};
|
|
|
|
|
|
//tcp连接超时定时器
|
|
|
Timer::Ptr _con_timer;
|
|
|
//tcp连接结果回调对象
|
|
|
- std::shared_ptr<std::function<void(int)> > _async_con_cb;
|
|
|
+ std::shared_ptr<function<void(int)> > _async_con_cb;
|
|
|
|
|
|
//记录上次发送缓存(包括socket写缓存、应用层缓存)清空的计时器
|
|
|
Ticker _send_flush_ticker;
|
|
|
+ //复用的socket读缓存,每次read socket后,数据存放在此
|
|
|
+ BufferRaw::Ptr _read_buffer;
|
|
|
//socket fd的抽象类
|
|
|
SockFD::Ptr _sock_fd;
|
|
|
//本socket绑定的poller线程,事件触发于此线程
|
|
|
EventPoller::Ptr _poller;
|
|
|
//跨线程访问_sock_fd时需要上锁
|
|
|
- mutable MutexWrapper<std::recursive_mutex> _mtx_sock_fd;
|
|
|
+ mutable MutexWrapper<recursive_mutex> _mtx_sock_fd;
|
|
|
|
|
|
//socket异常事件(比如说断开)
|
|
|
onErrCB _on_err;
|
|
@@ -558,16 +501,16 @@ private:
|
|
|
//tcp监听收到accept请求,自定义创建peer Socket事件(可以控制子Socket绑定到其他poller线程)
|
|
|
onCreateSocket _on_before_accept;
|
|
|
//设置上述回调函数的锁
|
|
|
- MutexWrapper<std::recursive_mutex> _mtx_event;
|
|
|
+ MutexWrapper<recursive_mutex> _mtx_event;
|
|
|
|
|
|
//一级发送缓存, socket可写时,会把一级缓存批量送入到二级缓存
|
|
|
List<std::pair<Buffer::Ptr, bool> > _send_buf_waiting;
|
|
|
//一级发送缓存锁
|
|
|
- MutexWrapper<std::recursive_mutex> _mtx_send_buf_waiting;
|
|
|
+ MutexWrapper<recursive_mutex> _mtx_send_buf_waiting;
|
|
|
//二级发送缓存, socket可写时,会把二级缓存批量写入到socket
|
|
|
List<BufferList::Ptr> _send_buf_sending;
|
|
|
//二级发送缓存锁
|
|
|
- MutexWrapper<std::recursive_mutex> _mtx_send_buf_sending;
|
|
|
+ MutexWrapper<recursive_mutex> _mtx_send_buf_sending;
|
|
|
//发送buffer结果回调
|
|
|
BufferList::SendResult _send_result;
|
|
|
//对象个数统计
|
|
@@ -584,31 +527,30 @@ public:
|
|
|
//发送char *
|
|
|
SockSender &operator << (const char *buf);
|
|
|
//发送字符串
|
|
|
- SockSender &operator << (std::string buf);
|
|
|
+ SockSender &operator << (string buf);
|
|
|
//发送Buffer对象
|
|
|
SockSender &operator << (Buffer::Ptr buf);
|
|
|
|
|
|
//发送其他类型是数据
|
|
|
template<typename T>
|
|
|
SockSender &operator << (T &&buf) {
|
|
|
- std::ostringstream ss;
|
|
|
+ ostringstream ss;
|
|
|
ss << std::forward<T>(buf);
|
|
|
send(ss.str());
|
|
|
return *this;
|
|
|
}
|
|
|
|
|
|
- ssize_t send(std::string buf);
|
|
|
+ ssize_t send(string buf);
|
|
|
ssize_t send(const char *buf, size_t size = 0);
|
|
|
};
|
|
|
|
|
|
//Socket对象的包装类
|
|
|
-class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface, public std::enable_shared_from_this<SocketHelper> {
|
|
|
+class SocketHelper : public SockSender, public SockInfo, public TaskExecutorInterface {
|
|
|
public:
|
|
|
- using Ptr = std::shared_ptr<SocketHelper>;
|
|
|
SocketHelper(const Socket::Ptr &sock);
|
|
|
- ~SocketHelper() override = default;
|
|
|
+ ~SocketHelper() override;
|
|
|
|
|
|
- ///////////////////// Socket util std::functions /////////////////////
|
|
|
+ ///////////////////// Socket util functions /////////////////////
|
|
|
/**
|
|
|
* 获取poller线程
|
|
|
*/
|
|
@@ -642,26 +584,10 @@ public:
|
|
|
*/
|
|
|
Socket::Ptr createSocket();
|
|
|
|
|
|
- /**
|
|
|
- * 获取socket对象
|
|
|
- */
|
|
|
- const Socket::Ptr &getSock() const;
|
|
|
-
|
|
|
- /**
|
|
|
- * 尝试将所有数据写socket
|
|
|
- * @return -1代表失败(socket无效或者发送超时),0代表成功?
|
|
|
- */
|
|
|
- int flushAll();
|
|
|
-
|
|
|
- /**
|
|
|
- * 是否ssl加密
|
|
|
- */
|
|
|
- virtual bool overSsl() const { return false; }
|
|
|
-
|
|
|
///////////////////// SockInfo override /////////////////////
|
|
|
- std::string get_local_ip() override;
|
|
|
+ string get_local_ip() override;
|
|
|
uint16_t get_local_port() override;
|
|
|
- std::string get_peer_ip() override;
|
|
|
+ string get_peer_ip() override;
|
|
|
uint16_t get_peer_port() override;
|
|
|
|
|
|
///////////////////// TaskExecutorInterface override /////////////////////
|
|
@@ -684,42 +610,17 @@ public:
|
|
|
*/
|
|
|
void shutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown")) override;
|
|
|
|
|
|
- /**
|
|
|
- * 线程安全的脱离 Server 并触发 onError 事件
|
|
|
- * @param ex 触发 onError 事件的原因
|
|
|
- */
|
|
|
- void safeShutdown(const SockException &ex = SockException(Err_shutdown, "self shutdown"));
|
|
|
-
|
|
|
- ///////////////////// event functions /////////////////////
|
|
|
- /**
|
|
|
- * 接收数据入口
|
|
|
- * @param buf 数据,可以重复使用内存区,不可被缓存使用
|
|
|
- */
|
|
|
- virtual void onRecv(const Buffer::Ptr &buf) = 0;
|
|
|
-
|
|
|
- /**
|
|
|
- * 收到 eof 或其他导致脱离 Server 事件的回调
|
|
|
- * 收到该事件时, 该对象一般将立即被销毁
|
|
|
- * @param err 原因
|
|
|
- */
|
|
|
- virtual void onError(const SockException &err) = 0;
|
|
|
-
|
|
|
- /**
|
|
|
- * 数据全部发送完毕后回调
|
|
|
- */
|
|
|
- virtual void onFlush() {}
|
|
|
-
|
|
|
- /**
|
|
|
- * 每隔一段时间触发, 用来做超时管理
|
|
|
- */
|
|
|
- virtual void onManager() = 0;
|
|
|
-
|
|
|
protected:
|
|
|
void setPoller(const EventPoller::Ptr &poller);
|
|
|
void setSock(const Socket::Ptr &sock);
|
|
|
+ const Socket::Ptr& getSock() const;
|
|
|
|
|
|
private:
|
|
|
bool _try_flush = true;
|
|
|
+ uint16_t _peer_port = 0;
|
|
|
+ uint16_t _local_port = 0;
|
|
|
+ string _peer_ip;
|
|
|
+ string _local_ip;
|
|
|
Socket::Ptr _sock;
|
|
|
EventPoller::Ptr _poller;
|
|
|
Socket::onCreateSocket _on_create_socket;
|