-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathConnection.cpp
More file actions
101 lines (86 loc) · 2.55 KB
/
Connection.cpp
File metadata and controls
101 lines (86 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Connection.cpp
*
* Created on: Oct 21, 2015
* Author: root
*/
#include "Server.h"
#include "EventLoopThread.h"
#include "Connection.h"
using namespace fm;
void SocketDataArrived(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
Connection* conn = (Connection*)watcher->data;
conn->ReadAndDisptchMessage();
}
Connection::Connection(int fd, struct sockaddr_in addr,Server* server,EventLoopThread* eventLoopThread):
m_clientFd(fd),m_clientAddr(addr),m_eventLoop(NULL),m_server(server),m_eventLoopThread(eventLoopThread)
{
}
Connection::~Connection()
{
if (m_eventLoop)
{
ev_io_stop(m_eventLoop, &m_ev_read);
}
close(m_clientFd);
std::cout<<"Connection::~Connection"<<std::endl;
}
void Connection::InitConnectin(struct ev_loop* loop)
{
m_eventLoop = loop;
m_ev_read.data = this;
ev_io_init(&m_ev_read, SocketDataArrived, m_clientFd, EV_READ);
ev_io_start(m_eventLoop, &m_ev_read);
}
bool Connection::ReadAndDisptchMessage()
{
try{
Message::Header header;
int read_size = read(m_clientFd,&header,sizeof(Message::Header));
if (read_size < 0)
{
std::cout<<"Read Message Header Error"<<std::endl;
return false;
}
else if (read_size == 0)
{
std::cout<<"Client Close"<<std::endl;
close(m_clientFd);
m_eventLoopThread->DecreaseFdCount();
m_server->RemoveConnection(shared_from_this());
return false;
}
if (read_size < sizeof(Message::Header))
{
std::cout<<"Message header is too short"<<std::endl;
return false;
}
if (!Message::Verify((Message*)&header))
{
std::cout<<"Invalid tcp message header"<<std::endl;
return false;
}
std::cout<<"Receive Message Body Length Is: "<<header.length<<std::endl;
std::cout<<"Receive Message ID Length Is: "<<header.msgid<<std::endl;
Message* curMessage = Message::Allocate(header.msgid, header.length);
int remainLength = header.length + sizeof(Message) - sizeof(Message::Header);
read_size = read(m_clientFd,((unsigned char*)curMessage)+sizeof(Message::Header),remainLength);
if (read_size < 0) {
std::cout<<"Read Message Body Error"<<std::endl;
close(m_clientFd);
return false;
}
std::string messageStr((char*)&(curMessage->body[0]));
std::cout<<"accept data from client: "<<messageStr<<std::endl;
std::cout<<fm::Time::Now().FormatString()<<":accept data from client("<<inet_ntoa(m_clientAddr.sin_addr)<<":"<<
m_clientAddr.sin_port<<") :"<<messageStr<<std::endl;
if (!m_server->AddMessage(curMessage))
{
Message::Free(curMessage);
}
}catch(std::exception& ex){
std::cout<<"error:"<<ex.what()<<std::endl;
}
return true;
}