网络通信第五课多线程异步服务器

场景
        本例子支持多线程异步处理消息,针对每一个链接请求,创建线程处理稍后的指令,CSimpleSession::SessionThreadFunc是线程函数,async_read_some函数设置接收数据的回调函数ContinueRead,一般情况下,read_some函数未必能够完整的读取客户端发送的数据包,当然必须要指定明确的结束标志,双方必须规定好等接收完毕的时候,必须等待线程返回,因此在析构函数调用m_thread->join函数,等线程函数正常返回之后,关闭连接,如果没有等待线程返回,就直接关闭连接,会导致async_read_some函数抛出异常,目前暂时没有什么头绪

成都创新互联是一家专业的成都网站建设公司,我们专注成都网站设计、做网站、网络营销、企业网站建设,外链广告投放平台为企业客户提供一站式建站解决方案,能带给客户新的互联网理念。从网站结构的规划UI设计到用户体验提高,创新互联力求做到尽善尽美。

service.h

#ifndef QPIDPUSHMESSAGESERVICE_H
#define QPIDPUSHMESSAGESERVICE_H

#include
#include
#include
#include
#include
#include
#include
#include
#include
#include

namespace qpid
{
class CSimpleSession : public boost::enable_shared_from_this
{
public:
CSimpleSession(boost::asio::io_service &io_service) : m_socket(io_service)
{
m_bRunning = true;
PrepareForNextRecv();
}
~CSimpleSession()
{
m_bRunning = false;
m_thread->join();
m_socket.close();
}

void StartThread()
{
static boost::asio::ip::tcp::no_delay option(true);
m_socket.set_option(option);
m_thread.reset(new boost::thread(boost::bind(&CSimpleSession::SessionThreadFunc, this)));
}

void SessionThreadFunc()
{
while (m_bRunning)
{
if (m_bStartSetCallBackRead)
{
m_socket.async_read_some(boost::asio::buffer(m_szRecvBuffer),
boost::bind(&CSimpleSession::ContinueRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
m_bStartSetCallBackRead = false;
}
boost::this_thread::sleep_for(boost::chrono::milliseconds(300));
}
m_bRunning = false;
}

boost::asio::ip::tcp::socket &GetSocket()
{
return m_socket;
}

bool GetCurThreadRunningStatus()
{
return m_bRunning;
}

void PrepareForNextRecv()
{
memset(m_szRecvBuffer, 0x00, 10240);
m_strMatch = "";
m_bStartSetCallBackRead = true;
}

private:

void ContinueRead(const boost::system::error_code &error, std::size_t bytes_transferred)
{
if (error)
{
m_bRunning = false;
return;
}

m_strMatch =  m_szRecvBuffer;
int nIndexOfContentLength = m_strMatch.find("Content-Length:", 0);
int indexOfEnd = m_strMatch.find("\r\n\r\n", 0);
if (nIndexOfContentLength  == -1)
{
m_bRunning = false;
return;
}
std::cout << m_strMatch << std::endl;
std::string strContextLen = m_strMatch.substr(nIndexOfContentLength + 15, indexOfEnd - nIndexOfContentLength - 15);
int nContextLen = atoi(strContextLen.c_str());
if (nContextLen < m_strMatch.length())
{
//handle
m_bRunning = false;
return;
}

m_socket.async_read_some(boost::asio::buffer((m_szRecvBuffer)),
boost::bind(&CSimpleSession::ContinueRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

private:
boost::asio::ip::tcp::socket m_socket;
char m_szRecvBuffer[10240];
std::string m_strMatch;
bool m_bStartSetCallBackRead;
bool m_bRunning;
boost::shared_ptr m_thread;
};

typedef boost::shared_ptr CPtrSession;

class CSimpleServer
{
public:

CSimpleServer(boost::asio::io_service &io_service, boost::asio::ip::tcp::endpoint &endpoint)
:m_ioService(io_service), m_acceptor(io_service, endpoint)
{
CPtrSession newSession(new CSimpleSession(io_service));
m_vecSession.push_back(newSession);
m_acceptor.async_accept(newSession->GetSocket(),
boost::bind(&CSimpleServer::HandleAccept,
this,
newSession,
boost::asio::placeholders::error));
}

void HandleAccept(CPtrSession newSession, const boost::system::error_code &error)
{
if (error)return;

//如果Start函数进行了阻塞,只有处理完当前的连接,才会进行下一步处理连接
newSession->StartThread();
ClearHasEndConnection();
CPtrSession createNewSession(new CSimpleSession(m_ioService));

//当前保存了会话连接,直到连接被释放,而不是由于createNewSession跳出循环,导致套接字异常

m_vecSession.push_back(createNewSession);
m_acceptor.async_accept(createNewSession->GetSocket(),
boost::bind(&CSimpleServer::HandleAccept,
this,
createNewSession,
boost::asio::placeholders::error));
}

//定时清除结束的连接

void ClearHasEndConnection()
{
std::vector::iterator iter;
iter = m_vecSession.begin();
std::size_t count = m_vecSession.size();
std::cout << "session count:" << count << std::endl;
while (iter != m_vecSession.end())
{
if (!(*iter)->GetCurThreadRunningStatus())
{
iter->reset();
m_vecSession.erase(iter);
break;
}
iter++;
}
}

void run()
{
m_ioService.run();
}

private:
boost::asio::io_service &m_ioService;
std::vector m_vecSession;
boost::asio::ip::tcp::acceptor m_acceptor;
};

void StartListenThread();

int StartListenService();
}

#endif

service.cpp

#include
#include "service.h"

void qpid::StartListenThread()
{
boost::asio::io_service ioService;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("192.168.0.34"), 7003);

qpid::CSimpleServer s(ioService, endpoint);
s.run();
}

int qpid::StartListenService()
{
boost::thread serviceThread(&StartListenThread);
serviceThread.detach();
return 0;
}

说明
这里跟之前的asio 异步服务器是有很大的区别
1)套接字可以不用关闭,其次也不需要担心线程的返回问题
2)不再需要保存请求处理的实例,自然也就没有管理所有实例的必要性,至于什么时候退出,服务器的接收线程不需要考虑
错误提醒:
在实际的应用环境中,在读数据m_socket.read_some(boost::asio::buffer(szRecvBuf), ec)的时候,会产生套接字错误,返回10035,代表含义是在一个非套接字上尝试了一个操作。
出现原因分析:
当线程分离的时候,accept函数开始等待下一个请求,createNewSession由于是智能指针,跳出了函数,开始调用析构函数进行对象的清理,这个时候m_socket已经被清理掉了,很多类的成员变量已经无法被使用了,m_vecThreadInstance.push_back(createNewSession);却能够保存对象的实例,不至于马上调用析构函数,如果调用该函数的话,就必须自己定时清理已经服务完毕的对象


网页标题:网络通信第五课多线程异步服务器
网站路径:http://hbruida.cn/article/pipooo.html