I/O多路转接——epoll服务器代码编写

这篇具有很好参考价值的文章主要介绍了I/O多路转接——epoll服务器代码编写。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、poll​

二、epoll

1.epoll

2.epoll的函数接口

①epoll_create

②epoll_ctl

③epoll_wait

3.操作原理

三、epoll服务器编写

1.日志打印

2.TCP服务器

3.Epoll

①雏形

②InitEpollServer 与 RunServer

③HandlerEvent

四、Epoll的工作模式

1.LT模式与ET模式

2.基于LT模式的epoll服务器

①整体框架

②处理BUG

③优化结构            

④异常处理

④序列化与反序列化

⑤优化

⑥Reactor模式与Proactor模式


一、poll

        该函数的作用也是如同select一样在IO中负责等,但是它不用每次将参数重设,并且没有上限。

        这需要参数的帮助。

        第一个参数的类型是一个指针,其中存放的类型是一个结构体。

I/O多路转接——epoll服务器代码编写

        fd为文件描述符,events是负责告诉内核关注什么事件,revents是负责告诉用户关注的事件是否就绪。替代了select中输入输出参数为同一个的问题。我们还可以将参数稍作调整更为直观。

int poll(struct pollfd fds[],nfds_t nfds,int timeout);  

        我们要关注读事件写事件怎么传呢?

I/O多路转接——epoll服务器代码编写

        这些都是宏,我们想关注多个事件可以将他们对应的宏按位与在一起,再传入。  

        第二个参数的类型是nfds_t,实则是long int。

I/O多路转接——epoll服务器代码编写

        该参数传递的是前一个参数作为数组中的元素个数。

        第三个参数,与select中的timeout不同的是 ,这里的timeout的参数不是结构体,直接表示的是微秒。

        关于poll的服务器编写就不多阐述了,我们将上篇文章的select稍微修改下,就能使用。大家可以移步先去观看上篇文章。

代码:

#include <iostream>
#include <poll.h>
#include "Sock.hpp"

using namespace std;
#define NUM 1024
struct pollfd fdsArray[NUM]; // 辅助数组 里面存放历史文件描述符
#define DEFAUIT -1           // 默认

void Usage(string process)
{
    cout << "Please entry" << process << " port" << endl;
}

static void ShowArray()
{
    cout << "当前的文件描述符为: ";
    for (int i = 0; i < NUM; i++)
    {
        if (fdsArray[i].fd == DEFAUIT)
            continue;
        cout << fdsArray[i].fd << ' ';
    }
    cout << endl;
}

static void HandlerEvent(int listensock)
{
    for (int j = 0; j < NUM; j++)
    {
        if (fdsArray[j].fd == DEFAUIT)
            continue;
        if (j == 0 && fdsArray[j].fd == listensock)
        {

            if (fdsArray[j].revents & POLLIN)
            {
                cout << "新连接到来,需要处理" << endl;

                string clientip;
                uint16_t clientport = 0;
                int sock = Sock::Accept(listensock, &clientip, &clientport); // 这里不会阻塞
                if (sock < 0)
                    return;
                cout << "获取新连接成功 " << clientip << ":" << clientport << " Sock:" << sock << endl;
                
                int i = 0;
                for (; i < NUM; i++)
                {
                    if (fdsArray[i].fd == DEFAUIT)
                        break;
                }
                if (i == NUM)
                {
                    cerr << "服务器已经到达了上限" << endl;
                    close(sock);
                }
                else
                {
                    // 将文件描述符放入fdsArray中
                    fdsArray[i].fd = sock;
                    fdsArray[i].events = POLLIN;
                    fdsArray[i].revents = 0;
                    // debug
                    ShowArray();
                }
            }
        }
        else
        {
            // 处理其他的文件描述符的IO事件
            if (fdsArray[j].revents & POLLIN)
            {
                char buffer[1024];
                ssize_t s = recv(fdsArray[j].fd, buffer, sizeof(buffer), 0);
                // 这里的阻塞读取真的会阻塞住吗?并不会,因为走到这里select已经帮我们等了,并且此时事件就绪。
                if (s > 0)
                {
                    buffer[s] = 0;
                    cout << "client[" << fdsArray[j].fd << "]"
                         << " # " << buffer << endl;
                }
                else if (s == 0)
                {
                    cout << "client[" << fdsArray[j].fd << "] "
                         << "quit"
                         << " server will close " << fdsArray[j].fd << endl;
                    fdsArray[j].fd = DEFAUIT; // 恢复默认
                    fdsArray[j].events = 0;
                    fdsArray[j].revents = 0;
                    close(fdsArray[j].fd);    // 关闭sock
                    ShowArray();           // debug
                }
                else
                {
                    cerr << "recv error" << endl;
                    fdsArray[j].fd = DEFAUIT; // 恢复默认
                    fdsArray[j].events = 0;
                    fdsArray[j].revents = 0;
                    close(fdsArray[j].fd);    // 关闭sock
                    ShowArray();           // debug
                }
            }
        }
    }
}

int main(int argc, char **argv)
{
    if (argc != 2)
    {
        Usage(argv[0]);
        exit(1);
    }
    int listensocket = Sock::Socket();
    Sock::Bind(listensocket, atoi(argv[1]));
    Sock::Listen(listensocket);

    for (int i = 0; i < NUM; i++)
    {
        fdsArray[i].fd = DEFAUIT;
        fdsArray[i].events = 0;
        fdsArray[i].revents = 0;
    }
    fdsArray[0].fd = listensocket; // 默认fdsArray第一个元素存放
    fdsArray[0].events = POLLIN;
    int timeout = 100000;
    while (1)
    {
        int n = poll(fdsArray, NUM, timeout);
        switch (n)
        {
        case 0:
            // timeout
            cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
            break;
            // error
            cout << "select error : " << strerror(errno) << endl;
        case -1:
            break;
        default:
            // 等待成功
            HandlerEvent(listensocket);
            break;
        }
    }
}

现象:

I/O多路转接——epoll服务器代码编写

二、epoll

1.epoll

        按照man手册中说法为:是为处理大批量句柄而做了改进的poll。

        epoll被公认为性能最好的多路I/O就绪通知方法。

 2.epoll的函数接口

①epoll_createI/O多路转接——epoll服务器代码编写

        该函数的目的是创建一个epoll句柄。

        自从linux2.6.8之后,size参数是被忽略的。

        返回值:成功时返回一个文件描述符,失败时返回-1。

②epoll_ctlI/O多路转接——epoll服务器代码编写

        该函数的作用是为对epoll句柄中添加特点的文件描述符对应的事件。也就是用户告诉内核要关注哪些事件。

        epfd:传入epoll_create的返回值。

        op:选项,有EPOLL_CTL_ADD、 EPOLL_CTL_MOD、EPOLL_CTL_DEL。

        fd:文件描述符。

        event:对应的事件。

        其中的结构体为:

I/O多路转接——epoll服务器代码编写

③epoll_waitI/O多路转接——epoll服务器代码编写

        作用是等待文件描述符对应的事件是否就绪。

        epfd:传入epoll_create的返回值。

        events:对应的事件。

        maxevents:当前关注的文件描述符的最大值。

        timeout:等待时间。

3.操作原理

        了解了上文这么多函数,其实仅仅是看并没有真正理解到epoll是怎么工作的,下面来讲讲操作原理。

        操作系统如何得知网络中的数据到来了?

        网卡中得到数据,会向CPU发送硬件中断,调用OS预设的中断函数,负责从外设进行数据拷贝,从外设拷贝内核缓冲区。

        epoll_create创建的epoll句柄是什么?

        epoll句柄可以理解为epoll模型。

        epoll_create会创建一个空的红黑树,一个就绪队列,创建对应的回调函数。

        这里的红黑树中的节点存放着要关注的文件描述符和事件等信息,属于用户告诉内核。这里的树等价于当初写poll、select维护的数组。

I/O多路转接——epoll服务器代码编写

         就绪队列存放着已经就绪的事件。属于内核告诉用户。I/O多路转接——epoll服务器代码编写        回调函数是在当数据到来发生硬件中断,os调用中断函数中拷贝之后使用。

I/O多路转接——epoll服务器代码编写

         该回调函数会依据就绪的数据,获取到对应的文件描述符和对应的事件,依据这两个内容构建一个fd_queue节点,插入到就绪队列中。I/O多路转接——epoll服务器代码编写

        这三个合起来为epoll模型。

        这个epoll模型是调用epoll_create会创建的,但是为什么返回值是一个文件描述符呢?

        我们知道文件描述符其实就是数组的下标,该数组存放着指向的管理文件的结构体的指针。具体的大家可以看我这篇文章。I/O多路转接——epoll服务器代码编写

        struct file中就存放着epoll的数据结构。

        由文件描述符找到文件的结构体,就可以找到红黑树,以及就绪队列等关于epoll的数据。        

        epoll_ctl则是来维护这个红黑树的,负责增加节点删除节点,也就是维护用户告诉内核信息的函数。

        epoll_wait则是来从内核中的就绪队列拿数据,有数据则证明有事件就绪了,以前poll需要便利数组去查看是否有文件描述符对应的事件就绪,现在只用通过检查就绪队列是否有数据就知道是否有文件描述符对应的事件就绪。时间复杂度为O(1)。

       

三、epoll服务器编写

1.日志打印

Log.hpp:
#include <cstdio>
#include <cstdarg>
#include <cassert>
#include <stdlib.h>
#include <time.h>

#define DEBUG 0
#define NOTICE 1
#define WARNING 2
#define FATAL 3

const char *log_level[] = {"DEBUG", "NOTICE", "WARNING", "FATAL"};

void logMessage(int level, const char *format, ...)
{
    assert(level >= DEBUG);
    assert(level <= FATAL);
    char logInfor[1024];
    char *name = getenv("USER");
    va_list ap;
    va_start(ap, format);

    vsnprintf(logInfor, sizeof(logInfor) - 1, format, ap);

    va_end(ap);
    
    FILE * out = (level == FATAL) ? stderr : stdout;

    fprintf(out,"%s | %u | %s | %s\n",\
        log_level[level],\
        (unsigned int)time(nullptr),\
        name == nullptr ? "Unkown" : name,\
        logInfor
    );
}

2.TCP服务器

Sock.hpp:
#pragma once

#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>

class Sock
{
public:
    static const int gbacklog = 20;

    static int Socket()
    {
        int listenSock = socket(PF_INET, SOCK_STREAM, 0);
        if (listenSock < 0)
        {
            exit(1);
        }
        int opt = 1;
        setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        return listenSock;
    }
    static void Bind(int socket, uint16_t port)
    {
        struct sockaddr_in local; // 用户栈
        memset(&local, 0, sizeof local);
        local.sin_family = PF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        // 2.2 本地socket信息,写入sock_对应的内核区域
        if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0)
        {
            exit(2);
        }
    }
    static void Listen(int socket)
    {
        if (listen(socket, gbacklog) < 0)
        {
            exit(3);
        }
    }

    static int Accept(int socket, std::string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);

        int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);
        if (serviceSock < 0)
        {
            // 获取链接失败
            return -1;
        }
        if (clientport)
            *clientport = ntohs(peer.sin_port);
        if (clientip)
            *clientip = inet_ntoa(peer.sin_addr);
        return serviceSock;
    }
};

3.Epoll

①雏形

EpollServer.hpp:
#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include "log.hpp"
using namespace std;

class EpollServer
{
public:
    EpollServer(uint16_t port)
        : port_(port), listensock_(-1), epfd_(-1)
    {
    }
    void InitEpollServer()
    {

    }
    void RunServer()
    {

    }

    ~EpollServer()
    {
        if(listensock_ != -1) close(listensock_);
        if(epfd_ != -1) close(epfd_);
    }
private:
    int listensock_;
    int epfd_;
    uint16_t port_;
};
epoll.cc:
#include "EpollServer.hpp"
#include "Sock.hpp"
#include "Log.hpp"
#include <memory>
void Usage(string process)
{
    cout << "Please entry" << process << " port" << endl;
}

int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }   
    unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1])));
    epoll->InitEpollServer();
    epoll->RunServer();

    return 0;
}

②InitEpollServer 与 RunServer

代码:

    void InitEpollServer()
    {
        listensock_ = Sock::Socket();
        Sock::Bind(listensock_, port_);
        Sock::Listen(listensock_);

        epfd_ = epoll_create(gsize);
        if (epfd_ < 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            exit(1);
        }
        logMessage(DEBUG, "epoll_creatr success,epoll模型创建成功,epfd: %d", epfd_);
    }
    void RunServer()
    {
        // 1.先添加listensock_到epoll模型中
        struct epoll_event ev;
        ev.events = EPOLLIN;
        ev.data.fd = listensock_;
        int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, listensock_, &ev);
        if (a != 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            exit(1);
        }
        struct epoll_event revs[num];
        int timeout = 10000;
        while (1)
        {
            int n = epoll_wait(epfd_, revs, num, timeout);
            switch (n)
            {
            case 0:
                // timeout
                cout << "timeout ... : " << (unsigned int)time(nullptr) << endl;
                break;
                // error
                cout << "epoll_wait error : " << strerror(errno) << endl;
            case -1:
                break;
            default:
                // 等待成功
                cout<<"event 到来"<<endl;
                break;
            }
        }
    }

现象:I/O多路转接——epoll服务器代码编写

③HandlerEvent

        下面开始处理就绪事件。首先,我们定义一个成员变量它的类型是function<int(int)>,并初始化,

I/O多路转接——epoll服务器代码编写

I/O多路转接——epoll服务器代码编写

         我们将自己想要对文件描述符处理的函数传入在类的实例化的过程中,并在HandlerEvent函数中调用该函数。

int myfuc(int sock)
{
    // 这里bug,TCP基于流式,如何保证一次将数据读取完毕,一会解决
    char buff[1024];
    int sz = recv(sock, buff, sizeof buff -1, 0);
    if (sz > 0)
    {
        buff[sz] = 0;
        logMessage(DEBUG, "client[%d]:%s", sock, buff);
        return sz;
    }
    return sz;
}

int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }
    unique_ptr<EpollServer> epoll(new EpollServer(atoi(argc[1]), myfuc));
    epoll->InitEpollServer();
    epoll->RunServer();
    return 0;
}

        

    void HandlerEvent(struct epoll_event *revs, int n)
    {
        for (int i = 0; i < n; i++)
        {
            int sock = revs[i].data.fd;
            uint32_t revent = revs[i].events;
            if (revent & EPOLLIN)
            {
                // IO
                if (sock == listensock_)
                {
                    // 监听套接字
                    string clientip;
                    uint16_t clientport = 0;
                    int iosock = Sock::Accept(listensock_, &clientip, &clientport);
                    if (iosock < 0)
                    {
                        logMessage(FATAL, "Sock error , errno : %d :%s", errno, strerror(errno));
                        continue;
                    }
                    // 托管给epoll
                    struct epoll_event ev;
                    ev.data.fd = iosock;
                    ev.events = EPOLLIN;
                    int a = epoll_ctl(epfd_, EPOLL_CTL_ADD, iosock, &ev);
                    assert(a == 0);
                    (void)a;
                }
                else
                {
                    // 其他套接字
                    int n = fuc_(sock);
                    if (n == 0 || n < 0)
                    {
                        int x = epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr);
                        assert(x == 0);
                        (void)x;
                        close(sock);
                        logMessage(DEBUG,"clinet[%d] exit",sock);
                    }
                }
            }
            else
            {
                // epollout 后续处理
            }
        }
    }

 现象:

I/O多路转接——epoll服务器代码编写

四、Epoll的工作模式

1.LT模式与ET模式

        LT(level trigger)水平触发,ET(edge trigger)边缘触发。

        LT模式通俗来讲,只要底层有数据,就一直通知上层。

        ET模式,只要底层有数据且是从无到有,从有到多,发生变化时才会通知上层。

        例如:你对妈妈说今天午饭LT模式,当妈妈将饭做好,等你吃饭时,并且一直在喊你,宝贝儿子快来吃饭,直到你去吃饭。这次你对妈妈说今天午饭ET模式,当妈妈将饭做好,饭从无到有,此时妈妈叫你吃饭,你没有去,此后饭的量恒定没有变化,妈妈就再也不会通知你。

        这两个模式哪一个更为高效呢?ET模式更为高效。以TCP通信来讲,并没有多少数据的到来,底层却一直在提醒上层,多数的提醒的都是在提醒同一个数据就绪。

        LT模式,只要有数据就会提醒,我们可以先去处理其他业务,等某次提醒时再处理底层的数据。但ET模式,数据到来之后没有变化,就再也不会提醒你,所以只能在提醒时就处理数据,要不后面再也没有提醒,这就倒逼程序员一次将数据读完。

        如何知道底层数据已经读完了?只有不断的去调用recv、read函数,如果报错了证明数据已经读完了。数据已经读完了,但最后一次,没数据了,read却阻塞住了。

        所以在ET模式下,所有的文件描述符都要设置成非阻塞。

        epoll默认就是LT模式。

2.基于LT模式的epoll服务器

①整体框架

        整体代码已经上传到gitee上,配合整体代码观看,更加直观便捷。

        下面我们基于上文的epoll服务器,但不同与上文,大家请往下看。        

        首先先建立一个类,该类将文件描述符和回调方法结合。

Tcpserver.hpp:
class Connection;
using fuc_t = function<int(Connection *)>;

class Connection
{

public:
    // 文件描述符
    int _sock;

    Tcpserver *_ptr;
    //  自己的接受和发送缓冲区
    string _inbuff;
    string _outbuff;

    // 回调函数
    fuc_t _readfuc;
    fuc_t _writefuc;
    fuc_t _exceptfuc;

public:
    Connection(int sock, Tcpserver *ptr) : _sock(sock), _ptr(ptr)
    {
    }
    ~Connection()
    {
    }
    void SetReadfuc(fuc_t fuc)
    {
        _readfuc = fuc;
    }
    void SetWritefuc(fuc_t fuc)
    {
        _writefuc = fuc;
    }
    void SetExceptfuc(fuc_t fuc)
    {
        _exceptfuc = fuc;
    }
};

        

Tcpserver.hpp:
class Tcpserver
{
public:
    Tcpserver(int port)
    {

        // 网络
        _listensock = Sock::Socket();
        Sock::Bind(_listensock, port);
        Sock::Listen(_listensock);

        // epoll
        _epfd = Epoller::CreateEpoll();

        // add事件
        Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);

        // 将listensock匹配的connection方法添加到unordered_map中
        auto iter = new Connection(_listensock, this);
        iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
        _conn.insert({_listensock, iter});

        // 初始化就绪队列
        _revs = new struct epoll_event[_revs_num];
    }
    int Accepter(Connection *conn)
    {
        string clientip;
        uint16_t clientport;
        int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
        if (sockfd < 0)
        {
            logMessage(FATAL, "accept error");
            return -1;
        }
        logMessage(DEBUG, "Get a new connect : %d", sockfd);
        AddConn(sockfd, EPOLLIN | EPOLLET);
        return 0;
    }
    bool SockinConn(int sock)
    {
        auto iter = _conn.find(sock);
        if (iter == _conn.end())
        {
            return false;
        }
        else
        {
            return true;
        }
    }

    void AddConn(int sock, uint32_t event)
    {
        // 将文件描述符加入epoll模型中
        Epoller::Addevent(_epfd, sock, event);
        // 将文件描述符匹配的connection,也加入map中
        _conn.insert({sock, new Connection(sock, this)});
        logMessage(DEBUG, "将文件描述符匹配的connection加入map成功");

    }

    void Dispatcher()
    {
        // 获取就绪事件
        int n = Epoller::GetReadyFd(_epfd, _revs, _revs_num);
        // logMessage(DEBUG, "GetReadyFd,epoll_wait");
        // 事件派发
        for (int i = 0; i < n; i++)
        {
            int sock = _revs[i].data.fd;
            uint32_t revent = _revs[i].events;
            if (EPOLLIN & revent)
            {
                // 先判空
                if (SockinConn(sock) && _conn[sock]->_readfuc)
                {
                    // 该文件描述符对应的读方法
                    _conn[sock]->_readfuc(_conn[sock]);
                }
            }
            if (EPOLLOUT & revent)
            {
                // 先判空
                if (SockinConn(sock) && _conn[sock]->_writefuc)
                {
                    // 该文件描述符对应的写方法
                    _conn[sock]->_writefuc(_conn[sock]);
                }
            }
        }
    }

    void Run()
    {
        while (1)
        {
            Dispatcher();
        }
    }
    ~Tcpserver()
    {
        if (_listensock != -1)
            close(_listensock);
        if (_epfd != -1)
            close(_epfd);
        delete[] _revs;
    }

private:
    // 1.网络sock
    int _listensock;
    // 2.epoll
    int _epfd;
    // 3.将epoll与上层代码结合
    unordered_map<int, Connection *> _conn;
    // 4.就绪事件列表
    struct epoll_event *_revs;
    // 5.就绪事件列表大小
    static const int _revs_num = 64;
};
Epoller.hpp:
#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstring>
#include <sys/epoll.h>
#include "Log.hpp"
using namespace std;

class Epoller
{
public:
    static int CreateEpoll()
    {
        int size = 128;
        int epfd = epoll_create(size);
        if (epfd < 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            exit(1);
        }
        return epfd;
    }
    static bool Addevent(int epfd, int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.data.fd = sock;
        ev.events = event;
        int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);
        if (n != 0)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
            return false;
        }
        return true;
    }
    static int GetReadyFd(int epfd, struct epoll_event evs[], int num)
    {
        // 阻塞式
        int n = epoll_wait(epfd, evs, num, -1);
        if (n == -1)
        {
            logMessage(FATAL, "%d:%s", errno, strerror(errno));
        }
        return n;
    }
};

       

Epoll.cc:
#include "Tcpserver.hpp"
#include "Sock.hpp"
#include <memory>

using namespace std;


void Usage(string process)
{
    cout << "Please entry" << process << " port" << endl;
}



int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }
    unique_ptr<Tcpserver> ep(new Tcpserver(atoi(argc[1])));
    ep->Run();
    return 0;
}

现象: I/O多路转接——epoll服务器代码编写

        当然还没有写完。

        结合整体代码看:①通过Tcpserver的构造函数,先创建网络套接字,建立epoll模型②监听listen套接字,并为listen设置对应的connect类设置读方法③将listen套接字添加到epoll模型中④通过epoll_wait等待事件就绪⑤当listen套接字就绪时,调用回调函数其中accept来获取新连接的到来。

        同时,因为我们今天写的是ET模式,所以要将文件描述符设置为非阻塞式,即使用fcntl。

        目前代码只写到了这里,想纵观全貌更加的详细请看我的gitee。

        下面继续增加内容,新的连接到来,要为新的连接增加对应的方法。

        并且要进行序列化与反序列化,因为我们的服务器是基于TCP的流式读取,每次读取我们确保不了读上来的数据是完整的数据,所以要做处理。

        当前的处理为我们读上的数据如同:112233X1213Xadasd。‘X’作为分隔符,我们需要读上来的数据,将数据进行分割。

        同样建立一个回调方法,在read之后对数据进行分割通过分割,在此之前对它进行初始化。

using callbcak_t = function<int(Connection *, string &)>;

I/O多路转接——epoll服务器代码编写

int HandlerPro(Connection *conn, string &message)
{
    // 我们能保证走到这里一定是完整的报文,已经解码
    // 接下来是反序列化
    cout << "获取request : " << message <<"剩余的信息是"<<conn->_inbuff<<endl;
}

int main(int argv, char **argc)
{
    if (argv != 2)
    {
        Usage(argc[0]);
        exit(1);
    }
    unique_ptr<Tcpserver> ep(new Tcpserver(HandlerPro, atoi(argc[1])));
    ep->Run();
    return 0;
}
    int TcpRecver(Connection *conn)
    {
        // 对普通套接字读取
        while (true)
        {
            char buff[1024];
            ssize_t sz = recv(conn->_sock, buff, sizeof(buff) - 1, 0);
            if (sz > 0)
            {
                buff[sz] = '\0';
                conn->_inbuff += buff;
            }
            else if (sz == 0)
            {
                logMessage(DEBUG, "client quit");
            }
            else if (sz < 0)
            {
                if (errno == EINTR)
                {
                    // 因为信号导致IO关闭,但数据还没有读完
                    continue;
                }
                else if (errno == EAGAIN || errno == EWOULDBLOCK)
                {
                    // 读完了
                    break;
                }
                else
                {
                    // 读取出错
                }
            }
        }
        // 本轮读取完毕
        // 将读取上来的 如:xxxxx/3xxxxx/3xxx/3
        // 分为 xxxxx 、xxxxx、xxx
        vector<string> result;
        PackageSplit(conn->_inbuff, &result);
        for (auto &message : result)
        {
            _cb(conn, message);
        }

        return 0;
    }

现象:

I/O多路转接——epoll服务器代码编写

②处理BUG

        我们要解决Accept函数的BUG,因为我们当前是ET模式,如果当前有大量的连接来,系统只会通知上层一次,而只进行一次调用显然是不对的,会导致读不上其他到来的链接。所以此时我们要进行循环读取,那循环读取时什么时候停止呢?要根据Accept函数中的accept函数调用失败时设置的errno来判别。我们来看下究竟errno会被设置成什么。I/O多路转接——epoll服务器代码编写

        EAGAIN和EWOULDBLOCK,意思为,当前的文件描述符被置为非阻塞的且当前没有可接受的连接。意味着已经将当前到来的连接读完了。

        EINTR表示当前的系统调用被一个捕捉到的信号中断在一个有效的连接到来之前。

        这三个宏值得我们关注,剩下的宏都是表明accept出错了,所以我们可以这样修改函数。

    int Accepter(Connection *conn)
    {
        while (1)
        {
            string clientip;
            uint16_t clientport;
            int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
            if (sockfd < 0)
            {
                if (errno == EINTR) // 被信号中断
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取结束
                    break;
                else
                {
                    // 出错
                    logMessage(FATAL, "accept error");
                    return -1;
                }
            }
            logMessage(DEBUG, "Get a new connect : %d", sockfd);
            AddConn(sockfd, EPOLLIN | EPOLLET);
        }
        return 0;
    }

        文章来源地址https://www.toymoban.com/news/detail-432425.html

③优化结构        I/O多路转接——epoll服务器代码编写         

        我们会发现被标注的代码,与下文的AddConn函数功能具有相似性,为了避免耦合性更高,我们将这段代码移到AddConn函数中。

    void AddConn(int sock, uint32_t event, fuc_t readfuc, fuc_t writefuc, fuc_t exceptfuc)
    {
        if (event & EPOLLET)
            Util::SetNonBlock(sock);

        // 将文件描述符加入epoll模型中
        Epoller::Addevent(_epfd, sock, event);
        // 将文件描述符匹配的connection,也加入map中
        Connection *conn = new Connection(sock, this);

        conn->SetReadfuc(readfuc);
        conn->SetWritefuc(writefuc);
        conn->SetExceptfuc(exceptfuc);

        // conn->SetReadfuc(std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1));
        // conn->SetWritefuc(std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1));
        // conn->SetExceptfuc(std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));

        _conn.insert({sock, conn});
        logMessage(DEBUG, "将文件描述符匹配的connection加入map成功");
    }

         改了AddConn函数,还需对其他涉及到此函数的地方大动干戈。

    Tcpserver(callbcak_t cb, int port) : _cb(cb)
    {

        // 网络
        _listensock = Sock::Socket();
        Util::SetNonBlock(_listensock);
        Sock::Bind(_listensock, port);
        Sock::Listen(_listensock);

        // epoll
        _epfd = Epoller::CreateEpoll();

        //  添加listen事件
        AddConn(_listensock, EPOLLIN | EPOLLET,
                std::bind(&Tcpserver::Accepter, this, std::placeholders::_1), nullptr, nullptr);

        // // add事件
        // Epoller::Addevent(_epfd, _listensock, EPOLLIN | EPOLLET);

        // // 将listensock匹配的connection方法添加到unordered_map中
        // auto iter = new Connection(_listensock, this);
        // iter->SetReadfuc(std::bind(&Tcpserver::Accepter, this, std::placeholders::_1));
        // _conn.insert({_listensock, iter});

        // 初始化就绪队列
        _revs = new struct epoll_event[_revs_num];
    }
    int Accepter(Connection *conn)
    {
        while (1)
        {
            string clientip;
            uint16_t clientport;
            int sockfd = Sock::Accept(conn->_sock, &clientip, &clientport);
            if (sockfd < 0)
            {
                if (errno == EINTR) // 被信号中断
                    continue;
                else if (errno == EAGAIN || errno == EWOULDBLOCK) // 读取结束
                    break;
                else
                {
                    // 出错
                    logMessage(FATAL, "accept error");
                    return -1;
                }
            }
            logMessage(DEBUG, "Get a new connect : %d", sockfd);
            AddConn(sockfd, EPOLLIN | EPOLLET,
                    std::bind(&Tcpserver::TcpRecver, this, std::placeholders::_1),
                    std::bind(&Tcpserver::TcpSender, this, std::placeholders::_1),
                    std::bind(&Tcpserver::TcpExcepter, this, std::placeholders::_1));
        }
        return 0;
    }

④异常处理

        就绪的文件描述符挂断了,就绪的文件描述符出错了怎么办。我们统一起来转为文件描述符

读事件或写事件就绪,必定在读时或写时出错,我们转而去那时处理异常。I/O多路转接——epoll服务器代码编写

 I/O多路转接——epoll服务器代码编写

         接下来,我们呢要对异常处理函数编写。

代码:

    int TcpExcepter(Connection *conn)
    {
        // 处理普通套接字异常
        // 0.检测
        if (!SockinConn(conn->_sock))
            return -1;

        // 1.移除事件
        Epoller::DelEvent(_epfd, conn->_sock);
        logMessage(DEBUG, "remove epoll event");

        // 2.关闭文件描述符
        close(conn->_sock);
        logMessage(DEBUG, "close fd :%d", conn->_sock);

        // 3.删除map中的sock对应的conn
        // delete conn;
        delete _conn[conn->_sock];
        logMessage(DEBUG, "delete conn object success");

        // 4.去掉sock和conn的映射关系 上一步只是delete掉了对象,但是映射关系还在
        _conn.erase(conn->_sock);
        logMessage(DEBUG, "erase conn from map success");
    }

现象:

④序列化与反序列化

        何为序列化?何为反序列化?

        我们在网络传输时以字符串形式发送,传输时以二进制形式。那我想要给对方发送结构体怎么办,则需要将结构体转换为字符串形式,这个过程称为序列化;对方收到字符串,通过反序列化就可以得到结构体。

        我们今天的结构体如图所示:

struct Request 
{
    int _x;
    int _y;
    char _op;
};

struct Response
{
    int _result;
    int _exitcode;
};

         客户端发送一串字符串我们接受到之后,通过反序列化转换为结构体。

bool Parser(string &in, Request *out)
{
    // 反序列化
    // 1 + 1, 2 * 4, 5 * 9, 6 *1
    std::size_t spaceOne = in.find(SPACE);
    if (std::string::npos == spaceOne)
        return false;
    std::size_t spaceTwo = in.rfind(SPACE);
    if (std::string::npos == spaceTwo)
        return false;

    std::string dataOne = in.substr(0, spaceOne);
    std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);
    std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));
    if (oper.size() != 1)
        return false;

    // 转成内部成员
    out->_x = atoi(dataOne.c_str());
    out->_y = atoi(dataTwo.c_str());
    out->_op = oper[0];

    return true;
}

void Serialize(Response &in, string *out)
{
    // 序列化
    // "exitCode_ result_"
    std::string ec = std::to_string(in._exitcode);
    std::string res = std::to_string(in._result);

    *out = ec;
    *out += SPACE;
    *out += res;
    *out += CRLF;
}

        接下来,去文件描述符对应的读方法,调用该函数。

Response Calculate(Request &req)
{

    Response resp = {0,0};
    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
        {
            resp._exitcode = 1; // 1 除零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x / req._y;
        break;
    }
    case '%':
    {
        if (req._y == 0)
        {
            resp._exitcode = 2; // 2 模零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x % req._y;
        break;
    }
    default:
        resp._exitcode = 3; // 非法输入
        break;
    }
    return resp;
}

int HandlerPro(Connection *conn, string &message)
{
    // 我们能保证走到这里一定是完整的报文,已经解码
    cout << "获取request : " << message << endl;
    // 1 * 1
    // 接下来是反序列化
    Request req;
    if (Parser(message, &req) == false)
    {
        return -1;
    }

    // 业务处理
    Response resp = Calculate(req);

    // 序列化
    string out;
    Serialize(resp, &out);

    // 发送给client
    conn->_outbuff += out;
    // 发送 
}

        能不能直接调用send方法无脑的直接向客户端发送呢?

        首先要知道写的缓冲区是否已满,如何检测缓冲区已满?

        在LT模式中,当我们想写时只需要将对应的文件描述符所对应的EPOLLOUT添加事件中,在我们今天的编写代码中,在whlie循环中的事件检测中,当检测到是EPOLLOUT事件时,我们就可以在回调函数中调用send函数。

        在ET模式中,我们也可以使用上面的方法,但是ET模式追求高效,所以一般会直接发送数据,如果数据发送完了,那就可以结束了;如果没有发送完,缓冲区已满,就会选择去拜托EPOLL去完成后续任务。


int HandlerPro(Connection *conn, string &message)
{

    // 我们能保证走到这里一定是完整的报文,已经解码
    cout << "---------------" << endl;
    // 1 * 1
    // 接下来是反序列化
    cout << "获取request : " << message << endl;
    Request req;
    if (Parser(message, &req) == false)
    {
        return -1;
    }

    // 业务处理
    Response resp = Calculate(req);

    // 序列化
    string out;
    Serialize(resp, &out);

    // 发送给client
    conn->_outbuff += out;
    conn->_writefuc(conn);
    if (conn->_outbuff.empty())
    {
        if (conn->_outbuff.empty() == 0)
            conn->_ptr->ModSockEvent(conn->_sock, true, false);
        else
            conn->_ptr->ModSockEvent(conn->_sock, true, true);
    }
    // // 发送
    // // conn->_ptr->ModSockEvent(conn->_sock, true, true);

    cout << "---------------" << endl;
}
    void ModSockEvent(int sock, bool read, bool write)
    {
        uint32_t event = 0;
        event |= read ? EPOLLIN : 0;
        event |= write ? EPOLLOUT : 0;
        Epoller::ModEvent(_epfd, sock, event);
    }
    static bool ModEvent(int epfd, int sock, uint32_t event)
    {
        struct epoll_event ev;
        ev.data.fd = sock;
        ev.events = event;
        int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);
        return n == 0;
    }

        因为,我们截取字符串的函数没有找到我们规定的分隔符的话,就会将最后的一部字符串归到下一次读取时。原来的代码如下所示,所以就会导致每次输入一段字符串,只会输出最后一段的结果,我们只需要稍作修改。

void PackageSplit(string &buff, vector<string> *result)
{
    // asdasXdasdaXda
    // asdas dasda da
    while (true)
    {
        size_t pos = buff.find(SEP);
        if (pos == string::npos)
        {
            break;
        }
        result->push_back(buff.substr(0, pos));
        buff.erase(0, pos + SEP_SZ);
    }
}

修改后:

void PackageSplit(string &buff, vector<string> *result)
{
    // asdasXdasdaXda
    // asdas dasda da
    while (true)
    {
        size_t pos = buff.find(SEP);
        if (pos == string::npos)
        {
            if (buff.size() < 5)
            {
                buff.clear();
                break;
            }
            result->push_back(buff.substr(0, buff.size()));
            buff.clear();
            break;
        }
        result->push_back(buff.substr(0, pos));
        buff.erase(0, pos + SEP_SZ);
    }
}

现象:I/O多路转接——epoll服务器代码编写

⑤优化

        我们的代码耦合度太高了,数据处理的函数放在了源文件中,我们另起一个头文件,将处理函数放到该头文件中,这样业务处理是业务处理,网络服务是网络服务。

Service.hpp:
#pragma once 

#include "Protocol.hpp"
#include <functional>

using service_t = function<Response (Request &req)>;


Response Calculate(Request &req)
{

    Response resp = {0, 0};
    switch (req._op)
    {
    case '+':
        resp._result = req._x + req._y;
        break;
    case '-':
        resp._result = req._x - req._y;
        break;
    case '*':
        resp._result = req._x * req._y;
        break;
    case '/':
    {
        if (req._y == 0)
        {
            resp._exitcode = 1; // 1 除零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x / req._y;
        break;
    }
    case '%':
    {
        if (req._y == 0)
        {
            resp._exitcode = 2; // 2 模零错误
            resp._result = INT32_MAX;
        }
        else
            resp._result = req._x % req._y;
        break;
    }
    default:
        resp._exitcode = 3; // 非法输入
        break;
    }
    return resp;
}

    

epoll.cc:
int HandlerProHelp(Connection *conn, string &message,service_t service)
{
// 我们能保证走到这里一定是完整的报文,已经解码
    cout << "---------------" << endl;
    // 1 * 1
    // 接下来是反序列化
    cout << "获取request : " << message << endl;
    Request req;
    if (Parser(message, &req) == false)
    {
        return -1;
    }

    // 业务处理
    Response resp = service(req);

    // 序列化
    string out;
    Serialize(resp, &out);

    // 发送给client
    conn->_outbuff += out;
    conn->_writefuc(conn);
    if (conn->_outbuff.empty())
    {
        if (conn->_outbuff.empty() == 0)
            conn->_ptr->ModSockEvent(conn->_sock, true, false);
        else
            conn->_ptr->ModSockEvent(conn->_sock, true, true);
    }
    // // 发送
    // // conn->_ptr->ModSockEvent(conn->_sock, true, true);

    cout << "---------------" << endl;

    return 0;
}


int HandlerPro(Connection *conn, string &message)
{
    return HandlerProHelp(conn,message,Calculate);
}

        这样在想处理其他业务的时候,只需要将业务处理函数放入Service.hpp中,然后将源文件中调用就行。

⑥Reactor模式与Proactor模式

        我们今天使用的是Reactor模式。

Reactor模式:

        Linux系统中最常用的反应器模式。

        半同步半异步。

        即负责事件的派发,有否则IO,或者说业务处理。

Proactor模式:

        只负责事件的派发,就绪的事件推送给后台的进程、线程池,不关心处理的细节。

        到这里,epoll服务器的编写,应该就告一段落了,尽管还有数不清的BUG,和没有说清楚的知识点,但是总体还是挺完美的,那么感谢观看,我们下次再见。

        

到了这里,关于I/O多路转接——epoll服务器代码编写的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 计算机网络编程 | 多路I/O转接服务器

    欢迎关注博主 Mindtechnist 或加入【Linux C/C++/Python社区】一起学习和分享Linux、C、C++、Python、Matlab,机器人运动控制、多机器人协作,智能优化算法,滤波估计、多传感器信息融合,机器学习,人工智能等相关领域的知识和技术。 专栏:《网络编程》 多路IO转接服务器也叫做多

    2024年02月12日
    浏览(17)
  • 多路IO—POll函数,epoll服务器开发流程

    \\\"在计算机网络编程中,多路IO技术是非常常见的一种技术。其中,Poll函数和Epoll函数是最为常用的两种多路IO技术。这两种技术可以帮助服务器端处理多个客户端的并发请求,提高了服务器的性能。本文将介绍Poll和Epoll函数的使用方法,并探讨了在服务器开发中使用这两种技

    2024年02月06日
    浏览(13)
  • 【TCP服务器的演变过程】使用IO多路复用器epoll实现TCP服务器

    手把手教你从0开始编写TCP服务器程序,体验开局一块砖,大厦全靠垒。 为了避免篇幅过长使读者感到乏味,对【TCP服务器的开发】进行分阶段实现,一步步进行优化升级。 本节,在上一章节的基础上,将IO多路复用机制select改为更高效的IO多路复用机制epoll,使用epoll管理每

    2024年01月17日
    浏览(23)
  • Linux多路IO复用技术——epoll详解与一对多服务器实现

    本文详细介绍了Linux中epoll模型的优化原理和使用方法,以及如何利用epoll模型实现简易的一对多服务器。通过对epoll模型的优化和相关接口的解释,帮助读者理解epoll模型的工作原理和优缺点,同时附带代码实现和图解说明。

    2024年02月05日
    浏览(21)
  • 【Linux】多路转接 -- epoll

    epoll系统调用和select以及poll是一样的,都是可以让我们的程序同时监视多个文件描述符上的事件是否就绪。 epoll在命名上比poll多了一个poll,这个e可以理解为extend, epoll就是为了同时处理大量文件描述符而改进的poll。 epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有

    2024年02月14日
    浏览(25)
  • 【网络】多路转接——poll | epoll

    🐱作者:一只大喵咪1201 🐱专栏:《网络》 🔥格言: 你只管努力,剩下的交给时间! 书接上文五种IO模型 | select。 poll 也是一种多路转接的方案,它专门用来解决 select 的两个问题: 等待fd有上限的问题。 每次调用都需要重新设置 fd_set 的问题。 如上图所示便是 poll 系统调

    2024年02月10日
    浏览(18)
  • 多路转接-epoll/Reactor(2)

    上次说到了poll,它存在效率问题,因此出现了改进的poll----epoll。 目前epoll是公认的效率最高的多路转接的方案。  epoll_create: 这个参数其实已经被废弃了。 这个值只要大于0就可以了。  这是用来创建一个epoll模型的。 创建成功了就返回一个文件描述符。失败了返回-1 epo

    2024年04月13日
    浏览(9)
  • 【Linux】IO多路转接技术Epoll的使用

    ​ 在学习 epoll 之前,我们首先了解一下Linux中的多路复用技术: 在Linux系统中, IO多路复用 是一种重要的技术,它允许一个进程同时监视多个文件描述符,一旦某个描述符准备好进行读取(通常是读就绪或写就绪),内核会通知该进程进行相应的读写操作。这样,我们可以

    2024年04月27日
    浏览(10)
  • 多路转接方案:select poll epoll 介绍和对比

    内存和外设的交互叫做IO,网络IO就是将数据在内存和网卡间拷贝。 IO本质就是等待和拷贝,一般等待耗时往往远高于拷贝耗时。所以提高IO效率就是尽可能减少等待时间的比重。 IO模型 简单对比解释 阻塞IO 阻塞等待数据到来 非阻塞IO 轮询等待数据到来 信号驱动 信号递达时

    2024年02月08日
    浏览(20)
  • 【Linux】高级IO --- 多路转接,select,poll,epoll

    所有通过捷径所获取的快乐,无论是金钱、性还是名望,最终都会给自己带来痛苦 1. 后端服务器最常用的网络IO设计模式其实就是Reactor,也称为反应堆模式,Reactor是单进程,单线程的,但他能够处理多客户端向服务器发起的网络IO请求,正因为他是单执行流,所以他的成本就

    2024年02月09日
    浏览(23)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包