/* group.c 集群功能任务 */ #include "includes.h" #define GROUP_SER_PORT 25060 typedef enum{ GROUP_IDLE, GROUP_TCP, GROUP_TCP_WAIT, GROUP_SEND_WAIT_ACK, GROUP_END, }GROUP_ENUM; static GROUP_ENUM groupStatus=GROUP_IDLE; T_INT32 login_status=0; static TUPSEND_DEF tupsendPara; static void group_login_req(void); static void groupRecv(T_INT32 fd); static void LoginAckHandle(T_UINT8 *data, T_UINT16 len); static T_INT16 timeout; /* groupRecv_cb 集群访问接收监控IO回调 */ void groupRecv_cb(void *param){ int len_ret; T_UINT8 buffer[256]; LSAPI_OSI_Event_t *pEvent = (LSAPI_OSI_Event_t *)param; TUPSEND_DEF *para=(TUPSEND_DEF *)pEvent->param3; T_BOOL needExitThread=FALSE; showTupEventInfo("group",pEvent->id,para,1);//服务器主动关闭时,不马上关闭TCP,否则收到不数据了,这里延迟后也不闭TCP了,因为有超时判断 switch(pEvent->id){ case LSAPI_SOCK_TCPIP_SOCKET_CONNECT_RSP: tupsendPara.fd=para->fd; if(GROUP_TCP_WAIT==groupStatus) groupStatus=GROUP_SEND_WAIT_ACK; break; case LSAPI_SOCK_TCPIP_SOCKET_SEND_RSP:break; case LSAPI_SOCK_TCPIP_SOCKET_CLOSE_RSP: if(GROUP_TCP_WAIT==groupStatus){ if(--timeout){ groupStatus=GROUP_TCP; wlog_warn("group connect retry"); }else{ wlog_warn("group connect tout"); groupStatus=GROUP_END; } needExitThread=TRUE; } break; case LSAPI_SOCK_TCPIP_REV_DATA_IND: while(1){ len_ret=LSAPI_SOCK_Recv(para->fd, buffer, sizeof(buffer),0); if(len_ret>=0){ if(len_ret==0) break; LoginAckHandle(buffer, len_ret); userCloseSocket(¶->fd); needExitThread=TRUE; break; }else{ wlog_warn("group recv failed"); break; } } break; case LSAPI_SOCK_TCPIP_CLOSE_IND:break; case LSAPI_SOCK_TCPIP_ERR_IND: wlog_error("group server err"); groupStatus=GROUP_END; needExitThread=TRUE; break; default: break; } LSAPI_OSI_Free(pEvent); if(TRUE==needExitThread) threadPostEvent(LSAPI_OSI_ThreadCurrent(),USER_EVENT_EXIT); } void isGroupIdle(void){ if(TRUE==talking.groupStart) ticketDeVote(TICKET_PT_GROUP); else ticketVote(TICKET_PT_GROUP); } /* ptGroupTask 集群访问任务操作 */ //group handler start...//////////////////////////////// PT_THREAD (ptGroupTask(pt_timer_t *ptPool, struct pt *pt)){ static pt_timer_t ptTimer; static T_UINT8 index,i; T_UINT8 ret; static T_INT8 ipAddr[20]; PT_BEGIN(pt); while(1){ if(TRUE==talking.groupStart && talking.netWork.netReady!=0){ if(paras.groupServer[0]=='\0'){ wlog_warn("group server null , quit"); talking.groupStart=FALSE; }else if(GROUP_IDLE==groupStatus){ if(FALSE==setDomainForIp(paras.groupServer, &index)){ wlog_warn("group domain fun busy, retry later"); PTTimerStart(ptPool, &ptTimer,300);//3 seconds later retry PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer)); }else{ i=0; while(1){ ret=getDomainForIp(index, ipAddr, sizeof(ipAddr)-1); if(ret==DOMAIN_ERR){ wlog_error("group domain fail , quit"); talking.groupStart=FALSE; groupStatus=GROUP_END;break; }else if(DOMAIN_OK==ret){ wlog_info("group domain done:%s", ipAddr); tupParaSet(&tupsendPara, GROUP_SER_PORT, groupRecv_cb,GROUP_TCP_THREAD_STACK); timeout=5; groupStatus=GROUP_TCP; break; }else{ PTTimerStart(ptPool, &ptTimer,50);//wait 0.5 per time PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer)); if(++i>=20){ wlog_error("group domain timeout, quit"); talking.groupStart=FALSE; groupStatus=GROUP_END;break; } } } } }else if(GROUP_TCP==groupStatus){ //do tcp for ip now if(1==talking.netWork.netReady){ switch(tryConnectTup(ipAddr,TUP_TCP,&tupsendPara)){ case TUP_STATUS_TRUE: wlog_info("group connect server ok"); groupStatus=GROUP_SEND_WAIT_ACK; break; case TUP_STATUS_FALSE: if(--timeout){ wlog_warn("group connect failed, retry later"); PTTimerStart(ptPool, &ptTimer,300);//3 seconds later retry PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer)); }else{ wlog_warn("group connect timeout"); groupStatus=GROUP_END; } break; case TUP_STATUS_WAIT: wlog_info("group connect wait"); groupStatus=GROUP_TCP_WAIT; break; } } }else if(GROUP_SEND_WAIT_ACK==groupStatus){ login_status=0; group_login_req(); wlog_info("group wait ack"); timeout=5; do{ PTTimerStart(ptPool, &ptTimer,100); PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer)); }while(--timeout && login_status<=0); if(1==login_status) wlog_info("group login ok"); else if(2==login_status) wlog_info("group login but no right"); else if(3==login_status) wlog_info("group packet error:len"); else if(4==login_status) wlog_info("group packet error:sum"); else if(5==login_status) wlog_info("group packet error:no login"); else wlog_info("group login but server no responsed"); //close tcp groupStatus=GROUP_END; }else if(GROUP_END==groupStatus){ wlog_info("group end"); talking.groupStart=FALSE; groupStatus=GROUP_IDLE; } } isGroupIdle(); PTTimerStart(ptPool, &ptTimer,1);//should be fast PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer)); } PT_END(pt); } #ifdef ACTIVE_LIBEV /* groupRecv_cb 集群访问接收监控IO回调 */ static void groupRecv_cb(struct ev_loop *main_loop,ev_io *io_w, int revents){ T_INT32 len; T_UINT8 buffer[256]; if(EV_ERROR & revents){ wlog_warn("groupRecv_cb error"); ev_io_stop(main_loop, io_w); return; } len=recv(tupsendPara.fd, buffer, sizeof(buffer),0); wlog_info("get data len:%d", len); if(len>0) LoginAckHandle(buffer, len); userCloseSocket(tupsendPara.fd); ev_io_stop(main_loop, io_w); } #endif /* groupRecv 集群访问接收监控IO创建 */ static void groupRecv(T_INT32 fd){ #ifdef ACTIVE_LIBEV static ev_io io_recv; wlog_info("groupRecv call"); ev_io_stop(loop, &io_recv); ev_init(&io_recv, groupRecv_cb); ev_io_set(&io_recv, fd,EV_READ); ev_io_start(loop, &io_recv); #endif } /* SendLoginFill 集群访问请求数据包打包 */ static T_UINT16 SendLoginFill(T_UINT8 *info,T_UINT32 psn, T_INT8 *key){ T_UINT8 *p=info; T_UINT16 i,len; i=0; p[i++]=(psn>>24); p[i++]=(psn>>16); p[i++]=(psn>>8); p[i++]=psn; p[i++]=0; p[i++]=0; p[i++]=0; p[i++]=0; memcpy(p+i, "groupPoc",8); i+= 8; p[i++]=0; p[i++]=0; len=strlen(key); if(len>20) len=20; memset(p+i, 0, 20); memcpy(p+i, key,len); i += 20; return i; } /* SendDataPack 集群访问请求数据包发送 */ static T_UINT16 SendDataPack(T_UINT8 *info, T_UINT16 datalen){ T_UINT16 j,i=0; T_UINT8 *pBuf=info,sum; pBuf[0]=0x12; i=datalen+1; pBuf[1]=i>>8; pBuf[2]=i; sum=0; for(j=0;j<1+2+datalen;j++) sum += pBuf[j]; pBuf[3+datalen]=sum; return (3+datalen+1); } /* group_login_req 集群访问请求数据包 */ static void group_login_req(void){ T_UINT8 info[128]; T_UINT16 length; length=SendLoginFill(info+3,paras.psn,paras.pass); length=SendDataPack(info,length); wlog_info("group send login req info"); trySendTup(info, length, TUP_TCP, &tupsendPara); } /* LoginAckHandle 集群访问请求的响应数据解析 */ #define INVALID_IP_ADDRESS "0.0.0.0" static void LoginAckHandle(T_UINT8 *data, T_UINT16 len){ T_UINT8 type,sum,right; T_UINT8 *p=data; T_UINT16 thisLen,i; T_INT8 info[IP_SIZE+1]; T_UINT8 *pip,*gip,needsave=0; type=*p++; thisLen=*p++; thisLen <<= 8; thisLen &= 0xff00; thisLen |= *p++; if((1+2+thisLen) != len){ login_status=3; return; } sum=0; for(i=0;i