Rpc异步日志模块

这篇具有很好参考价值的文章主要介绍了Rpc异步日志模块。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

Rpc异步日志模块作用

在一个大型分布式系统中,任何部署的分布式节点都可能发生崩溃,试想如果用普通的办法,即先排查哪个节点down掉了,找到down掉的节点后采取调试工具gdb调试该节点,进而排查宕机的原因。这中排查方法对于人力物力都是无法接受的。
那么由此记录日志就变得至关重要,分布式RPC框架必定存在一个异步日志模块,用于记录所有分布式站点的调试信息。通过日志分析就能很容易排查出哪个节点出了问题

Rpc异步日志模块实现思路

一个日志模块必须是异步的,不能影响主程序的运行,例如在RPC框架中显然不能阻塞了RPCProvider(服务提供者)和RPCConsumer(服务调用者)的运行
RPCProvider是一个能接受高并发rpc请求的高性能服务器(epoll+多线程),那么存在多个线程同时写日志的情况,这里的“写日志”并不是真正意义上的写磁盘文件的操作,因为磁盘IO会严重拖累该线程原本执行的其他任务。所以这里的写日志 只是多个线程将日志写入一个异步缓冲队列(这个操作是在内存进行的非常快),并且这个队列必须是线程安全的。
此外,应该另起一个线程来读取队列里面的日志数据进行真正的磁盘写文件操作,这样写日志线程是单独工作的,它只是依赖于异步缓冲队列里面的数据,不会影响RPC服务线程和其他IO线程。
Rpc异步日志模块,分布式,rpc,分布式,c++

Rpc异步日志模块实现

下面提供一个简单版本的实现
异步缓冲队列类:

#pragma once

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

//异步写日志的日志队列
template<typename T>
class LockQueue
{

public:
    //多个work线程都会写日志queue
    void Push(const T& data)
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_queue.push(data);
        m_condvariable.notify_one();
    }
    //一个线程读日志queue,写日志文件
    T Pop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        while(m_queue .empty())
        {
            //日志队列为空, 线程进入wait状态
            m_condvariable.wait(lock);
        }

        T data = m_queue.front();
        m_queue.pop();
        return data;
    }
private:
    std::queue<T> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_condvariable;

};

Logger.h类:

#pragma once

#include "lockqueue.h"
#include <utility>

//日志级别
enum LogLevel{
    INFO = 1, //普通信息
    ERROR  //错误信息
};

//Mprpc框架提供的日志系统
class Logger
{
public:
    //获取日志的单例
    static Logger& GetInstance();
    //写日志
    void Log(std::pair<LogLevel,std::string> msg);

private:
    LockQueue<std::pair<LogLevel,std::string>> m_lckQue; //日志缓冲队列

    Logger();
    Logger(const Logger&) = delete;
    Logger(Logger&& ) = delete;
    Logger& operator=(const Logger&) = delete;
};

//定义宏  LOG_XXX("xxx %d %s", 20, "sdasd");
#define LOG_INFO(logmsgformat, ...) \
    do \
    { \
        Logger &logger = Logger::GetInstance(); \
        char c[1024] = {0};           \
        snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
        std::pair<LogLevel,std::string> pr = std::make_pair(INFO, std::string(c)); \
        logger.Log(pr); \
    } while (0);

#define LOG_ERR(logmsgformat, ...) \
do \
{ \
    Logger &logger = Logger::GetInstance(); \
    char c[1024] = {0};           \
    snprintf(c, 1024, logmsgformat, ##__VA_ARGS__); \
    std::pair<LogLevel,std::string> pr = std::make_pair(ERROR, std::string(c)); \
    logger.Log(pr); \
} while (0);
    

Logger.cc

#include "logger.h"
#include <time.h>
#include <iostream>

//获取日志的单例
Logger& Logger::GetInstance()
{
    static Logger logger;
    return logger;
}

Logger::Logger()
{
    //启动专门的写日志线程
    std::thread writeLogTask([&](){
        for(;;)
        {
            //获取当前的日期,然后取日志信息,写入相应的日志文件当中 a+
            time_t now = time(nullptr);
            tm *nowtm = localtime(&now);

            char file_name[128];
            sprintf(file_name, "%d-%d-%d-log.txt", nowtm->tm_year + 1900, nowtm->tm_mon + 1, nowtm->tm_mday);

            FILE* pf = fopen(file_name, "a+");
            if(pf == nullptr)
            {
                std::cout<<"logger file:" << file_name <<"open error" << std::endl;
                exit(EXIT_FAILURE);
            }

            std::pair<LogLevel,std::string> msg = m_lckQue.Pop();

            char time_buf[128] = {0};
            sprintf(time_buf, "%d-%d-%d => [%s]", 
                                    nowtm->tm_hour, 
                                    nowtm->tm_min, 
                                    nowtm->tm_sec,
                                    (msg.first == INFO ? "info" : "error"));
            msg.second.insert(0, time_buf);
            msg.second.append("\n");
            fputs(msg.second.c_str(), pf);
            fclose(pf); 
        }
    });

    //设置分离线程, 守护线程
    writeLogTask.detach();

}

//写日志,把日志信息写入到lockqueue缓冲区当中
void Logger::Log(std::pair<LogLevel,std::string> msg)
{
    m_lckQue.Push(msg);
}

使用:文章来源地址https://www.toymoban.com/news/detail-628686.html

...
LOG_INFO("NotifyService UserService success");
LOG_ERR("eeeeeerrror%d", 9999999);
LOG_INFO("NotifyService GetFriendListService success");
...

查看日志文件:cat 2023-8-2-log.txt 
21-35-26 => [info]NotifyService UserService success
21-35-26 => [error]eeeeeerrror9999999
21-35-26 => [info]NotifyService GetFriendListService success

到了这里,关于Rpc异步日志模块的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 分布式RPC框架Dubbo详解

    分布式RPC框架Dubbo详解

    目录   1.架构演进 1.1 单体架构 1.2  垂直架构 1.3 分布式架构 1.4 SOA架构 1.5 微服务架构 2.RPC框架 2.1 RPC基本概念介绍 2.1.1 RPC协议 2.1.2 RPC框架 2.1.3 RPC与HTTP、TCP/ UDP、Socket的区别 2.1.4 RPC的运行流程  2.1.5 为什么需要RPC 2.2 Dubbo  2.2.1 Dubbo 概述 2.2.2 Dubbo实战   架构演进如下图: 这

    2024年02月07日
    浏览(23)
  • 聊聊分布式架构04——RPC通信原理

    聊聊分布式架构04——RPC通信原理

    目录 RPC通信的基本原理 RPC结构 手撸简陋版RPC 知识点梳理 1.Socket套接字通信机制 2.通信过程的序列化与反序列化 3.动态代理 4.反射 思维流程梳理 码起来 服务端时序图 服务端—Api与Provider模块 客户端时序图 RPC通信的基本原理 RPC(Remote Procedure Call)是一种远程过程调用协议,

    2024年02月07日
    浏览(12)
  • 分布式理论CAP、BASE和RPC

    CAP原则是指当分布式系统遇到网络分区时,只能满足其中两个需求,一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)。在实际系统中,我们常常会选择在CA、CP或AP三者中做出取舍。 CA模型 CA模型要求分布式系统保持强一致性,即所有节点上的数据都

    2023年04月10日
    浏览(18)
  • 分布式系统消息通信技术:MOM与RPC

    分布式系统消息通信技术:MOM与RPC

    中间件(Middleware)是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件+平台+通信,这

    2024年02月11日
    浏览(16)
  • RPC分布式网络通信框架(二)—— moduo网络解析

    RPC分布式网络通信框架(二)—— moduo网络解析

    网络部分,包括寻找rpc服务主机,发起rpc调用请求和响应rpc调用结果,使用muduo网络和zookeeper 服务配置中心 (专门做服务发现) 其中MprpcApplication类负责框架的一些初始化操作,注意去除类拷贝构造和移动构造函数(实现单例模式)。其中项目还构建了MprpcConfig类负责读取服

    2024年02月17日
    浏览(16)
  • RPC分布式网络通信框架(一)—— protobuf的使用

    RPC分布式网络通信框架(一)—— protobuf的使用

    常见序列化和反序列化协议有XML、JSON、protobuf,相比于其他protobuf更有优势: 1、protobuf是二进制存储的,xml和json都是文本存储的。故protobuf占用带宽较低 2、protobuf不需要存储额外的信息。 json如何存储数据?键值对。例:Name:”zhang san”, pwd: “12345”。 protobuf存储数据的方式

    2024年02月16日
    浏览(19)
  • 【DDD分布式系统学习笔记】RPC调用以及系统初步搭建

    modelVersion: 模型版本,指定POM模型的版本,目前使用的是Maven 4.0.0版本。 groupId: 项目的组织标识符,通常是组织的域名倒序。在这里是 cn.itedus.lottery。 artifactId: 项目的唯一标识符,通常是项目的名称。在这里是 Lottery。 packaging: 项目的打包方式,这里是 pom,表示这是一个聚合

    2024年01月18日
    浏览(18)
  • 【昕宝爸爸小模块】日志系列之什么是分布式日志系统

    【昕宝爸爸小模块】日志系列之什么是分布式日志系统

    ➡️博客首页       https://blog.csdn.net/Java_Yangxiaoyuan        欢迎优秀的你👍点赞、🗂️收藏、加❤️关注哦。        本文章CSDN首发,欢迎转载,要注明出处哦!        先感谢优秀的你能认真的看完本文,有问题欢迎评论区交流,都会认真回复! 现在,很多应

    2024年02月20日
    浏览(13)
  • Celery分布式异步框架

    Celery分布式异步框架

    \\\"\\\"\\\" 1)可以不依赖任何服务器,通过自身命令,启动服务(内部支持socket) 2)celery服务为为其他项目服务提供异步解决任务需求的 注:会有两个服务同时运行,一个是项目服务,一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的

    2024年02月11日
    浏览(13)
  • 分布式异步任务处理组件(二)

    一些关键点的设计脑暴记录----very important!!! 首先,任务存储交给kafka,由节点负责写入kafka,acks=1;失败重试;透传kafka的提交可靠性,保证任务提交成功;后续可以考虑自己实现kafka相关机制---做局部优化,因为强依赖kafka 如何保证消息唯一被消费一次---集群状态维护全

    2024年02月15日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包