【Linux后端服务器开发】封装线程池实现TCP多线程通信

这篇具有很好参考价值的文章主要介绍了【Linux后端服务器开发】封装线程池实现TCP多线程通信。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

目录

一、线程池模块

Thread.h

LockGuard.h

ThreadPool.h

二、任务模块模块

Task.h

三、日志模块

Log.h

四、守护进程模块

Deamon.h

 五、TCP通信模块

Server.h

Client.h

server.cpp

client.cpp


关于TCP通信协议的封装,此篇博客有详述:

【Linux后端服务器开发】TCP通信设计_命运on-9的博客-CSDN博客

线程池的设计,包含线程的封装、互斥锁的封装、线程池的封装

TCP通信的设计包含服务器的封装、客户端的封装

我们将任务代码和服务器解耦,需要再单独设计Task任务模块

为了模拟服务器设计的完整性,我们需要再设计一个日志模块

在很多情况下,服务器都是一个后台进程(守护进程),我们需要再设计一个守护进程模块

一、线程池模块

线程池设计为单例模式,线程池容量根据系统CPU的核数决定。

互斥锁设计为只能指针模式,增加安全性,避免出现死锁情况。

线程在调用函数的时候,函数不能是类内函数,所以即使需要调用的函数声明在类内,也需要将函数设置为静态。

Thread.h

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cassert>
#include <functional>
#include <pthread.h>

class Thread
{
    using func_t = std::function<void*(void*)>;
public:
    Thread()
    {
        char buf[1024];
        snprintf(buf, sizeof(buf), "thread-%d", s_thread_num++);
        _name = buf;
    }

    void Start(func_t func, void* args = nullptr)
    {
        _func = func;
        _args = args;
        pthread_create(&_tid, nullptr, Start_Routine, this);
    }

    void Join()
    {
        int n = pthread_join(_tid, nullptr);
        assert(n == 0);
    }

    std::string Thread_Name()
    {
        return _name;
    }

private:
    static void* Start_Routine(void* args)
    {
        Thread* tmp = static_cast<Thread*>(args);
        return tmp->Call_Back();
    }

    void* Call_Back()
    {
        _func(_args);
    }

private:
    std::string _name;
    func_t _func;
    void* _args;
    pthread_t _tid;
    static int s_thread_num;
};

int Thread::s_thread_num = 1;

LockGuard.h

# pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* plock)
        : _plock(plock)
    {}

    void lock()
    {
        if (_plock)
            pthread_mutex_lock(_plock);
    }

    void unlock()
    {
        if (_plock)
            pthread_mutex_unlock(_plock);
    }

private:
    pthread_mutex_t* _plock;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* plock)
        : _mutex(plock)
    {
        _mutex.lock();
    }

    ~LockGuard()
    {
        _mutex.unlock();
    }

private:
    Mutex _mutex;
};

ThreadPool.h

#pragma once

#include "Thread.h"
#include "LockGuard.h"
#include <vector>
#include <queue>
#include <mutex>
#include <unistd.h>

using namespace std;

const int g_num = 8;

template<class T>
class ThreadPool;

template<class T>
class ThreadData
{
public:
    ThreadPool<T>* _tp;
    std::string _name;

    ThreadData(ThreadPool<T>* tp, const std::string& name)
        : _tp(tp), _name(name)
    {}
};

template<class T>
class ThreadPool
{
public:
    ThreadPool(const int& num = g_num)
        : _num(num)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        for (int i = 0; i < num; ++i)
            _threads.push_back(new Thread());
    }

    // 单例模式
    static ThreadPool<T>* Get_Instance()
    {
        if (tp == nullptr)
        {
            sing_lock.lock();
            tp = new ThreadPool<T>();
            sing_lock.unlock();
        }
        return tp;
    }

    // 禁用拷贝与赋值
    ThreadPool(const ThreadPool<T>&) = delete;
    ThreadPool<T>& operator=(const ThreadPool<T>&) = delete;

    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        for (auto& t : _threads)
            delete t;
    }

public:
    void Push(const T& in)
    {
        LockGuard lock(&_mutex);
        _tasks.push(in);
        pthread_cond_signal(&_cond);
    }

    T Pop()
    {
        T t = _tasks.front();
        _tasks.pop();
        return t;
    }

    void Run()
    {
        for (const auto& t : _threads)
        {
            ThreadData<T>* td = new ThreadData<T>(this, t->Thread_Name());
            t->Start(Handler_Task, td);
            std::cout << t->Thread_Name() << " start ..." << endl;
        }
    }

private:
    static void* Handler_Task(void* args)
    {
        ThreadData<T>* td = static_cast<ThreadData<T>*>(args);
        while (true)
        {
            T t;
            {
                LockGuard lock(td->_tp->Mutex());
                while (td->_tp->Tasks_Empty())
                {
                    td->_tp->Thread_Wait();
                }
                t = td->_tp->Pop();
            }
            t();    // 执行任务
        }
        delete td;
        return nullptr;
    }

private:
    pthread_mutex_t* Mutex()
    {
        return &_mutex;
    }

    void Lock_Queue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void Unlock_Queue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Thread_Wait()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool Tasks_Empty()
    {
        return _tasks.empty();
    }

private:
    int _num;
    vector<Thread*> _threads;
    queue<T> _tasks;
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    static ThreadPool<T>* tp;
    static mutex sing_lock;
};

template<class T>
ThreadPool<T>* ThreadPool<T>::tp = nullptr;

template<class T>
mutex ThreadPool<T>::sing_lock;

二、任务模块模块

将任务封装成类对象仿函数,执行任务就是对类做()的重定义,当然当我们需要运行多个任务时,可以再次Task类对象和实际要运行的任务进行解耦,有类对象调用其他的任务函数。

Task.h

#pragma once

#include <iostream>
#include <unistd.h>

using namespace std;

class Task
{
public:
    Task()
    {}

    Task(int client_sock, string client_ip, uint16_t client_port)
        : _client_sock(client_sock), _client_ip(client_ip), _client_port(client_port)
    {}

    void operator()()
    {
        while (true)
        {
            // 5.1 接收信息
            char recv_buf[1024];
            int n = read(_client_sock, recv_buf, sizeof(recv_buf));
            if (n == 0)
                return;
            recv_buf[n] = 0;
            cout << "[" << _client_ip << ":" << _client_port << "]# " << recv_buf << endl;

            // 5.2 应答信息
            char sent_buf[1024];
            snprintf(sent_buf, sizeof(sent_buf), "服务器已收到信息: %s\n", recv_buf);
            write(_client_sock, sent_buf, sizeof(sent_buf));
        }
    }

private:
    int _client_sock;
    string _client_ip;
    uint16_t _client_port;
};

三、日志模块

日志模块需要我们记录服务器的正常连接与异常连接,分别用两个文件记录。

日志内容需要显示连接的状态、时间、进程号,当服务器与客户端连接成功的时候,日志还要记录客户端的IP和端口号。

Log.h

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cstdarg>
#include <ctime>
#include <unistd.h>
#include <memory>

#define LOG_NORMAL "log.txt"
#define LOG_ERROR  "err.txt"

#define DEBUG   0
#define NORMAL  1
#define WARNING 2
#define ERROR   3
#define FATAL   4

#define NUM 1024

const char* To_Lever_Str(int level)
{
    switch (level)
    {
    case DEBUG:
        return "DEBUG";
    case NORMAL:
        return "NORMAL";
    case WARNING:
        return "warning";
    case ERROR:
        return "ERROR";
    case FATAL:
        return "FATAL";
    default:
        return nullptr;
    }
}

void To_Time_Str(long int t, std::string& cur_time)
{
    // 将时间戳转换成tm结构体
    struct tm* cur;
    cur = gmtime(&t);
    cur->tm_hour = (cur->tm_hour + 8) % 24; //东八区

    char tmp[NUM];
    std::string my_format = "%Y-%m-%d %H:%M:%S";
    strftime(tmp, sizeof(tmp), my_format.c_str(), cur);
    cur_time = tmp;
}

// ... 可变参数列表
void Log_Message(int level, const char* format, const char* ip = "", const char* port = "")
{
    // [日志等级][时间][pid][message]
    char log_prefix[NUM];
    std::string cur_time;
    To_Time_Str(time(nullptr), cur_time);
    snprintf(log_prefix, sizeof(log_prefix), "[%s][%s][pid:%d]", 
             To_Lever_Str(level), cur_time.c_str(), getpid());

    std::string log_content = "";
    if (strcmp(ip, "") != 0 || strcmp(port, "") != 0)
    {
        log_content += "[";
        log_content += ip;
        log_content += ": ";
        log_content += port;
        log_content += "]";
    }
    log_content += format;

    FILE* log = fopen(LOG_NORMAL, "a");     // 连接记录日志
    FILE* err = fopen(LOG_ERROR, "a");      // 报错日志
    if (log != nullptr && err != nullptr)
    {
        FILE* curr = nullptr;
        if (level == DEBUG || level == NORMAL || level == WARNING)
            curr = log;
        else
            curr = err;

        if (curr)
            fprintf(curr, "%s%s\n", log_prefix, log_content.c_str());
        fclose(log);
        fclose(err);
    }
}

四、守护进程模块

守护进程模块我们需要将服务器后台化,让进程忽略掉异常的信号(服务器不能轻易挂掉,故需要屏蔽信号)。

守护进程的本质就是孤儿进程,我们需要在服务器执行的时候,分离出子进程,杀死主进程。

守护进程脱离终端,关闭或重定向以前进程默认打开的文件描述符。

Deamon.h

#pragma once

#include <unistd.h>
#include <signal.h>
#include <cstdlib>
#include <cassert>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define DEV "/dev/null"

void Deamon_Self(const char* curr_path = nullptr)
{
    // 1. 让进程忽略掉异常的信号
    signal(SIGPIPE, SIG_IGN);

    // 2. 守护进程,本质就是孤儿进程
    if (fork() > 0)
        exit(1);

    pid_t n = setsid();
    assert(n != -1);

    // 3. 守护进程脱离终端,关闭或重定向以前进程默认打开的文件
    int fd = open(DEV, O_RDWR);
    if (fd > 0)
    {
        dup2(fd, 0);
        dup2(fd, 1);
        dup2(fd, 2);
        close(fd);
    }
    else
    {
        close(0);
        close(1);
        close(2);
    }
}

 五、TCP通信模块

在上一篇博客的TCP通信协议的封装中,没有考虑TIME_WAIT引起bind失败的问题(这个问题其实很少见,可以不管),此处为了通信协议更加完善,我们将其加上。

在socket套接字创建之后,使用socketopt()设置socket描述符的选项SO_REUSEADDR为1,表示允许创建端口号相同但是IP地址不同的多个socket描述符。

int opt = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSERADDR, &opt, sizeof(opt));

这样设置之后,当服务器关闭之后,可以立刻重启。文章来源地址https://www.toymoban.com/news/detail-602818.html

Server.h

#pragma once

#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>
#include <functional>
#include <cerrno>
#include <thread>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Task.h"
#include "Log.h"
#include "ThreadPool.h"

using namespace std;

const string g_default_ip = "0.0.0.0";

enum
{
    USAGE_ERR = 1,
    SOCK_ERR,
    BIND_ERR,
    LISTEN_ERR,
    ACCEPT_ERR
};

class TcpServer
{
public:
    TcpServer(const uint16_t port, const string& ip = g_default_ip)
        : _port(port), _ip(ip), _listenfd(0)
    {}

    void Init()
    {
        // 1. 创建socket套接字
        _listenfd = socket(AF_INET, SOCK_STREAM, 0);
        if (_listenfd < 0)
        {
            Log_Message(FATAL, "socket error");
            exit(SOCK_ERR);
        }
        Log_Message(NORMAL, "socekt create success");

        // 1.1 解决TIME_WAIT状态bind失败问题
        int opt = 1;
        setsockopt(_listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

        // 2. bind绑定服务器网络信息
        struct sockaddr_in local;
        local.sin_family = AF_INET;
        local.sin_addr.s_addr = htonl(INADDR_ANY);
        local.sin_port = htons(_port);

        if (bind(_listenfd, (struct sockaddr*)&local, sizeof(local)) < 0)
        {
            Log_Message(FATAL, "bind error");
            exit(BIND_ERR);
        }
        Log_Message(NORMAL, "bind success");

        // 3. listen设置监听
        if (listen(_listenfd, 8) < 0)
        {
            // 监听的连接队列长度与项目的线程数相关
            Log_Message(FATAL, "listen error");
            exit(LISTEN_ERR);
        }
        Log_Message(NORMAL, "listen success");
    }

    void Start()
    {
        ThreadPool<Task>::Get_Instance()->Run();

        while (true)
        {
            // 4. accept连接客户端
            struct sockaddr_in client;
            socklen_t client_len = sizeof(client);
            int client_sock = accept(_listenfd, (struct sockaddr*)&client, &client_len);
            if (client_sock < 0)
            {
                Log_Message(FATAL, "accept error");
                exit(ACCEPT_ERR);
            }
            string client_ip = inet_ntoa(client.sin_addr);
            uint16_t client_port = ntohs(client.sin_port);
            Log_Message(NORMAL, "accept success", client_ip.c_str(), to_string(client_port).c_str());

            // 5. 连接成功,进行通信, 多线程
            ThreadPool<Task>::Get_Instance()->Push(Task(client_sock, client_ip, client_port));                                         // 线程分离
        }
    }

private:
    string _ip;
    uint16_t _port;
    int _listenfd;
};

Client.h

#pragma once

#include <iostream>
#include <unistd.h>
#include <string>
#include <cstring>
#include <functional>
#include <cerrno>

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "Log.h"

using namespace std;

class TcpClient
{
public:
    TcpClient(const uint16_t server_port, const string server_ip)
        : _server_port(server_port), _server_ip(server_ip), _sock(-1)
    {}

    void Init()
    {
        // 1. 创建套接字
        _sock = socket(AF_INET, SOCK_STREAM, 0);
        if (_sock < 0)
        {
            cerr << "socket error " << errno << ": " << strerror(errno) << endl;
            exit(1);
        }

        // 2. bind绑定,由OS绑定
    }

    void Run()
    {
        // 3. 向服务器发起连接请求
        struct sockaddr_in server;
        server.sin_family = AF_INET;
        server.sin_addr.s_addr = inet_addr(_server_ip.c_str());
        server.sin_port = htons(_server_port);

        if (connect(_sock, (struct sockaddr*)&server, sizeof(server)) != 0)
        {
            cerr << "connect error " << errno << ": " << strerror(errno) << endl;
            exit(1);
        }

        // 4. 连接成功,进行通信
        while (true)
        {
            // 4.1 发送信息
            char sent_buf[1024];
            cout << "请输入信息:";
            gets(sent_buf);
            write(_sock, sent_buf, sizeof(sent_buf));

            // 4.2 接收应答信息
            char recv_buf[1024];
            int n = read(_sock, recv_buf, sizeof(recv_buf));
            if (n > 0)
                recv_buf[n] = 0;
            cout << recv_buf << endl;
        }
    }

private:
    string _server_ip;
    uint16_t _server_port;
    int _sock;
};

server.cpp

#include "Server.h"
#include "Deamon.h"
#include <memory>

void Usage()
{
    cout << "Usage:\n\tserver port" << endl;
    exit(USAGE_ERR);
}

int main(int args, char* argv[])
{
    if (args != 2)
    {
        Log_Message(FATAL, "usage error");
        Usage();
    }

    uint16_t port = atoi(argv[1]);

    unique_ptr<TcpServer> tcp_server(new TcpServer(port));
    Deamon_Self();

    tcp_server->Init();
    tcp_server->Start();

    return 0;
}

client.cpp

#include "Client.h"
#include <memory>

void Usage()
{
    cout << "Usage:\n\tclient ip port" << endl;
    exit(1);
}

int main(int args, char* argv[])
{
    if (args != 3)
    {
        Log_Message(FATAL, "usage error");
        Usage();
    }

    string server_ip = argv[1];
    uint16_t server_port = atoi(argv[2]);

    unique_ptr<TcpClient> tcp_client(new TcpClient(server_port, server_ip));

    tcp_client->Init();
    tcp_client->Run();

    return 0;
}

到了这里,关于【Linux后端服务器开发】封装线程池实现TCP多线程通信的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【Linux后端服务器开发】socket套接字

    【Linux后端服务器开发】socket套接字

    目录 一、socket 套接字概述 二、socket 函数接口 三、IP地址与端口号的网络格式 四、TCP协议的本地通信C语言示例 socket 是什么? socket 本质上是一个抽象的概念,它是一组用于 网络通信的 API , 提供了一种统一的接口 ,使得应用程序可以通过网络进行通信。在不同的操作系统

    2024年02月16日
    浏览(10)
  • 【Linux后端服务器开发】基础IO与文件系统

    【Linux后端服务器开发】基础IO与文件系统

    目录 一、基础IO 1. C语言文件读写 2. 标志位传参 3. C语言与系统调用关系 二、文件系统 1. 文件描述符 2. 输入输出重定向 文件调用 库函数接口: fopen、fclose、fwrite、fread、fseek 系统调用接口:open、close、write、read、lseek r/w/a :读/写/追加 若打开的文件不存在,“r”报错,“

    2024年02月15日
    浏览(11)
  • 【Linux后端服务器开发】协议定制(序列化与反序列化)

    【Linux后端服务器开发】协议定制(序列化与反序列化)

    目录 一、应用层协议概述 二、序列化与反序列化 Protocal.h头文件 Server.h头文件 Client.h头文件 server.cpp源文件 client.cpp源文件 什么是应用层 ?我们通过编写程序解决一个个实际问题、满足我们日常需求的网络程序,都是应用层程序。 协议是一种“约定”,socket的api接口,在读

    2024年02月16日
    浏览(10)
  • 基于linux下的高并发服务器开发(第三章)- 3.8 线程同步
  • 简单的TCP网络程序·线程池(后端服务器)

    简单的TCP网络程序·线程池(后端服务器)

    目录 版本四:线程池 注意事项 文件:Task.hpp -- 任务单独为一个文件 组件:日志修改 新函数:vprintf() 可变参数的提取逻辑 vfprintf()的工作原理 初始化一个va_list 日志准备 获取时间小知识 日志初版 日志启动测试 TCP通用服务器(守护进程) * 新指令1:jobs -- 查看进程作业 新指令

    2024年02月09日
    浏览(15)
  • 强推Linux高性能服务器编程, 真的是后端开发技术提升, 沉淀自身不容错过的一本经典书籍

    强推Linux高性能服务器编程, 真的是后端开发技术提升, 沉淀自身不容错过的一本经典书籍

    目录 第1章 TCP/IP协议 1.1 TCP/IP协议族体系结构以及主要协议 1.1.1 数据链路层 1.1.2 网络层 1.1.3 传输层 1.1.4 应用层 1.2 封装 1.3 分用 1.5 ARP协议工作原理 1.5.1 以太网ARP请求/应答报文详解 1.5.2 ARP高速缓存的查看和修改 1.5.3 使用tcpdump观察ARP通信过程所得结果如下 本篇核心关键所在

    2024年02月07日
    浏览(21)
  • 基于Linux C++多线程服务器 + Qt上位机开发 + STM32 + 8266WIFI的智慧无人超市

    基于Linux C++多线程服务器 + Qt上位机开发 + STM32 + 8266WIFI的智慧无人超市

    针对传统超市购物车结账排队时间长、付款效率低的问题,提出了一种更符合现代社会人们购物方式-基于RFID的自助收银系统。习惯了快节奏生活的人们都会选择自助收银机结账,理由显而易见:自助收银机结账很方便,几乎不用排队,也不用近距离和收银员接触,在防疫时

    2024年03月10日
    浏览(12)
  • [Linux] 网络编程 - 初见TCP套接字编程: 实现简单的单进程、多进程、多线程、线程池tcp服务器

    [Linux] 网络编程 - 初见TCP套接字编程: 实现简单的单进程、多进程、多线程、线程池tcp服务器

    网络的上一篇文章, 我们介绍了网络变成的一些重要的概念, 以及 UDP套接字的编程演示. 还实现了一个简单更简陋的UDP公共聊天室. [Linux] 网络编程 - 初见UDP套接字编程: 网络编程部分相关概念、TCP、UDP协议基本特点、网络字节序、socket接口使用、简单的UDP网络及聊天室实现…

    2024年02月16日
    浏览(17)
  • 整合封装服务器模块设计实现

    服务器模块,是对当前所实现的所有模块的⼀个整合,并进⾏服务器搭建的⼀个模块,最终封装实现出⼀个gobang_server的服务器模块类,向外提供搭建五⼦棋对战服务器的接⼝。通过实例化的对象可以简便的完成服务器的搭建。 首先,我将采用websocketpp来搭建服务器,那么需要

    2024年02月13日
    浏览(13)
  • Linux网络编程二(TCP三次握手、四次挥手、TCP滑动窗口、MSS、TCP状态转换、多进程/多线程服务器实现)

    Linux网络编程二(TCP三次握手、四次挥手、TCP滑动窗口、MSS、TCP状态转换、多进程/多线程服务器实现)

    TCP三次握手 TCP 三次握手 (TCP three-way handshake)是TCP协议建立可靠连接的过程,确保客户端和服务器之间可以进行可靠的通信。下面是TCP三次握手的详细过程: 假设客户端为A,服务器为B 1 、第一次握手(SYN=1,seq=500) A向B发送一个带有SYN标志位的数据包,表示A请求建立连接。

    2024年02月06日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包