123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- #include "includes.h"
- void tupParaSet(TUPSEND_DEF *para, T_UINT16 port, MY_SOCK_CB *callback,T_UINT16 stacksize){
- para->port=port;
- para->tupRecvCallBack=callback;
- para->stacksize=stacksize;
- }
- void trySendTup(T_UINT8 *data, T_UINT16 len, TUP_ENUM tup, TUPSEND_DEF *para){
- T_INT32 ret;
- if(para->fd<=0) return;
- if(TUP_TCP==para->tupType) ret=nwy_socket_send(para->fd,data,len,0);
- else ret=nwy_socket_sendto(para->fd,data,len,0, ¶->sockAddr, sizeof(struct sockaddr_in));
- if(ret<0) wlog_info("tup[%d]:%d send failed", para->fd,len);
- else wlog_info("tup[%d]:%d send ok",para->fd,len);
- }
- void userCloseSocket(T_INT32 *fd){
- if(*fd<=0) return;
- wlog_info("SockClose:%d",*fd);
- nwy_socket_close(*fd);
- *fd=-1;
- }
- void showTupEventInfo(const char *name, T_UINT32 id,TUPSEND_DEF *para, char needShutWhenSerShut){
- // if(para->ptimer_t!=NULL){//有回调响应后清除定时器
- // wlog_info("[%s]timerStop",name);
- // LSAPI_OSI_TimerStop(para->ptimer_t);
- // para->ptimer_t=NULL;
- // }
- switch(id){
- case TUP_EVENT_SOCK_LINK_RECV: wlog_info("[%d %s]:recv data ok",id,name);break;
- case TUP_EVENT_SOCK_LINK_SERVER_SHUT:
- para->tupStatus=CNNT_CLOSED;
- if(needShutWhenSerShut==0) userCloseSocket(¶->fd);
- else wlog_info("[%d %s dly shut fd",id,name);
- wlog_info("[%d %s]:server shut",id,name);
- break;
- case TUP_EVENT_SOCK_LINK_ERR: para->tupStatus=CNNT_ERROR;userCloseSocket(¶->fd);wlog_info("[%d %s]:link err",id,name);break;
- case TUP_EVENT_SOCK_LINK_OK:para->tupStatus=CNNT_OK;wlog_info("[%d %s]:link ok",id,name);break;
- case TUP_EVENT_SOCK_LINK_CLIENT_SHUT: para->tupStatus=CNNT_CLOSED;userCloseSocket(¶->fd);wlog_info("[%d %s]:client shut",id,name);break;
- // case LSAPI_SOCK_TCPIP_SOCKET_SEND_RSP: wlog_info("[%d %s]:send data ok",id,name);break;
- case TUP_EVENT_SOCK_LINK_TIMEOUT: para->tupStatus=CNNT_TOUT;userCloseSocket(¶->fd);wlog_info("[%d %s]:link tout",id,name);break;
- default:wlog_info("%d %s]:ignore",id,name);break;
- }
- WakeupNow(name);
- }
- void sockTimerOutCallback(void *param){
- // TUPSEND_DEF *para=(TUPSEND_DEF *)param;
- // wlog_info("sockTimerOutCallback enter");
- // LSAPI_OSI_TimerStop(para->ptimer_t);
- // userCloseSocket(¶->fd);
- // para->errno=3;//连接超时错误3
- // threadPostEvent(nwy_get_current_thread(),USER_EVENT_EXIT);
- }
- void sockEntry(void *param){
- TUPSEND_DEF *para=(TUPSEND_DEF *)param;
- T_INT32 sockfd;
- T_INT32 type,tup;
- char recbuf[1024];
- int g_keepalive = 0;
- wlog_info("[NewThread:sockEntry]%x",nwy_get_current_thread());
- if(TUP_TCP==para->tupType){
- type=SOCK_STREAM;
- tup=IPPROTO_TCP;
- }else{
- type=SOCK_DGRAM;
- tup=IPPROTO_UDP;
- }
- // sockfd = LSAPI_SOCK_Create(LSAPI_SOCK_TCPIP_AF_INET, type, 0, para->tupRecvCallBack,(uint32_t)para);
- sockfd = nwy_socket_open(AF_INET,type,tup);
- if(sockfd<0){
- wlog_info("sockfd create err=%d",nwy_socket_errno());
- para->errno=1;//错误1
- nwy_exit_thread();
- return;
- }
- ip_addr_t addr;
- para->fd=sockfd;
- wlog_info("tup info:%d,%s:%d", sockfd,para->saddr, para->port);
- struct sockaddr_in *serveraddr=¶->sockAddr;
- serveraddr->sin_family = AF_INET;
- serveraddr->sin_port = htons(para->port);
- if(0==ipaddr_aton(para->saddr, &addr)){
- wlog_error("ipaddr_aton failed");
- userCloseSocket(&sockfd);
- para->fd=0;
- para->errno=2;//错误2
- userExitThread("sockEntry");
- return;
- }
- inet_addr_from_ip4addr(&serveraddr->sin_addr, ip_2_ip4(&addr));
- nwy_socket_setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&g_keepalive, sizeof(g_keepalive));
-
- if(0!=nwy_socket_set_nonblock(sockfd)){
- wlog_error("set_nonblock err");
- userCloseSocket(&sockfd);
- para->fd=0;
- para->errno=2;//错误2
- userExitThread("sockEntry");
- return;
- }
- //if(0 != nwy_socket_connect(sockfd, serveraddr,sizeof(para->sockAddr))){
- // wlog_error("sockfd connect err");
- // userCloseSocket(&sockfd);
- // para->fd=0;
- // para->errno=2;//错误2
- // userExitThread("sockEntry");
- // return;
- //}
- //创建一个定时器用于连接超时
- //para->ptimer_t=LSAPI_OSI_TimerCreate(nwy_get_current_thread(), sockTimerOutCallback, param);
- //if(NULL != para->ptimer_t) LSAPI_OSI_TimerStart(para->ptimer_t, 10000);//设置为超时10秒
- wlog_info("open fd:%d",sockfd);
- //做成事件响应型
- nwy_osiEvent_t tevent={0};
- int ret=-1,errno,cntstatus=0;
- int64_t timeout=nwy_get_ms();
- tevent.param3=para;
- fd_set rd_fd,ex_fd;
- struct timeval tv;//不能用临时变量
- tv.tv_sec=10;
- tv.tv_usec=0;
- for(;;){
- //检测是否连接超时
- if(cntstatus==0){//未连接成功时操作
- if(((nwy_get_ms()-timeout))>=10000){
- //超时未连接成功
- cntstatus=-1;
- tevent.id=TUP_EVENT_SOCK_LINK_TIMEOUT;
- para->tupRecvCallBack((void *)&tevent);
- }
- //检测是否connect成功
- if(ret!=0){
- ret=nwy_socket_connect(sockfd, serveraddr,sizeof(para->sockAddr));
- }
-
- if(ret!=0){//connet未成功,需要进一步检测
- errno=nwy_socket_errno();
- if(EALREADY == errno || EISCONN == errno){
- //连接成功
- cntstatus=1;
- }else if(EINPROGRESS != errno){
- //连接失败
- cntstatus=-2;
- tevent.id=TUP_EVENT_SOCK_LINK_ERR;
- para->tupRecvCallBack((void *)&tevent);
- }
- nwy_sleep(100);
- }else cntstatus=1;
- if(cntstatus==1){
- tevent.id=TUP_EVENT_SOCK_LINK_OK;
- para->tupRecvCallBack((void *)&tevent);
- }
- }else if(cntstatus==1){//select操作
- FD_ZERO(&rd_fd);FD_ZERO(&ex_fd);
- FD_SET(sockfd,&rd_fd);FD_SET(sockfd, &ex_fd);
-
- int result=nwy_socket_select(sockfd+1,&rd_fd,NULL,&ex_fd,&tv);
- if(result<0){//select 失败
- cntstatus=-3;
- tevent.id=TUP_EVENT_SOCK_LINK_ERR;
- para->tupRecvCallBack((void *)&tevent);
- }else if(result==0){//select 超时
- //没有数据可读,继续select
- }else{//select 成功
- if(FD_ISSET(sockfd, &rd_fd)){//有数据可读
- FD_CLR(sockfd, &rd_fd);
- int len;
- if(tup==IPPROTO_TCP) len=nwy_socket_recv(sockfd, recbuf, sizeof(recbuf),0);
- else{
- int tlen=sizeof(para->sockAddr);
- len=nwy_socket_recvfrom(sockfd, recbuf, sizeof(recbuf), 0, serveraddr,&tlen);
- }
- if(len==0){
- cntstatus=-4;
- tevent.id=TUP_EVENT_SOCK_LINK_SERVER_SHUT;
- para->tupRecvCallBack((void *)&tevent);
- }else if(len>0){
- tevent.id=TUP_EVENT_SOCK_LINK_RECV;
- tevent.param1=recbuf;
- tevent.param2=len;
- para->tupRecvCallBack((void *)&tevent);
- }else{
- cntstatus=-5;
- tevent.id=TUP_EVENT_SOCK_LINK_CLIENT_SHUT;
- para->tupRecvCallBack((void *)&tevent);
- }
- }
- if(FD_ISSET(sockfd, &ex_fd)){//异常
- tevent.id=TUP_EVENT_SOCK_LINK_ERR;
- cntstatus=-6;
- para->tupRecvCallBack((void *)&tevent);
- FD_CLR(sockfd, &rd_fd);
- }
- }
- }else{//退出线程
- nwy_sleep(50);
- userCloseSocket(¶->fd);
- userExitThread("sockEntry");
- }
- }
- }
- TUP_CONNECT_ENUM tryConnectTup(T_INT8 *seraddr, TUP_ENUM tup,TUPSEND_DEF *para){
- //必须使用线程来创建连接再waitevent
- para->saddr=seraddr;
- para->tupType=tup;
- para->errno=0;
- para->tupStatus=CNNT_BUSY;
- para->fd=0;
- if(NULL==nwy_create_thread("sockCreate", sockEntry,(void *)para,NWY_OSI_PRIORITY_NORMAL,para->stacksize,4)){
- wlog_error("tryConnectTup thread create error for %s",seraddr);
- return TUP_STATUS_FALSE;
- }
- return TUP_STATUS_WAIT;
- }
|