|
- /*
- group.c
- 集群功能任务
- */
- #include "includes.h"
- #define GROUP_SER_PORT 25060
- typedef enum{
- GROUP_IDLE,
- GROUP_TCP,
- GROUP_TCP_WAIT,
- GROUP_SEND_WAIT_ACK,
- GROUP_END,
- }GROUP_ENUM;
- static GROUP_ENUM groupStatus=GROUP_IDLE;
- T_INT32 login_status=0;
- static TUPSEND_DEF tupsendPara;
- static void group_login_req(void);
- static void groupRecv(T_INT32 fd);
- static void LoginAckHandle(T_UINT8 *data, T_UINT16 len);
- static T_INT16 timeout;
- /*
- groupRecv_cb
- 集群访问接收监控IO回调
- */
- void groupRecv_cb(void *param){
- unsigned char *pdata;
- int len_ret;
- // T_UINT8 buffer[256];
- nwy_osiEvent_t *pEvent = (nwy_osiEvent_t *)param;
- TUPSEND_DEF *para=(TUPSEND_DEF *)pEvent->param3;
- T_BOOL needExitThread=FALSE;
- showTupEventInfo("group",pEvent->id,para,1);//服务器主动关闭时,不马上关闭TCP,否则收到不数据了,这里延迟后也不闭TCP了,因为有超时判断
- switch(pEvent->id){
- case TUP_EVENT_SOCK_LINK_OK:
- tupsendPara.fd=para->fd;
- if(GROUP_TCP_WAIT==groupStatus) groupStatus=GROUP_SEND_WAIT_ACK;
- break;
- // case LSAPI_SOCK_TCPIP_SOCKET_SEND_RSP:break;
- case TUP_EVENT_SOCK_LINK_SERVER_SHUT:
- if(GROUP_TCP_WAIT==groupStatus){
- if(--timeout){
- groupStatus=GROUP_TCP;
- wlog_warn("group connect retry");
- }else{
- wlog_warn("group connect tout");
- groupStatus=GROUP_END;
- }
- needExitThread=TRUE;
- }
- break;
- case TUP_EVENT_SOCK_LINK_RECV:
- pdata=(unsigned char *)pEvent->param1;
- len_ret=pEvent->param2;
- LoginAckHandle(pdata, len_ret);
- userCloseSocket(¶->fd);
- needExitThread=TRUE;
- break;
- case TUP_EVENT_SOCK_LINK_TIMEOUT:break;
- case TUP_EVENT_SOCK_LINK_ERR:
- wlog_error("group server err");
- groupStatus=GROUP_END;
- needExitThread=TRUE;
- break;
- default: break;
- }
- // LSAPI_OSI_Free(pEvent);
- // if(TRUE==needExitThread) threadPostEvent(nwy_get_current_thread(),USER_EVENT_EXIT);
- }
- void isGroupIdle(void){
- if(TRUE==talking.groupStart) ticketDeVote(TICKET_PT_GROUP);
- else ticketVote(TICKET_PT_GROUP);
- }
- /*
- ptGroupTask
- 集群访问任务操作
- */
- //group handler start...////////////////////////////////
- PT_THREAD (ptGroupTask(pt_timer_t *ptPool, struct pt *pt)){
- static pt_timer_t ptTimer;
- static T_UINT8 index,i;
- T_UINT8 ret;
- static T_INT8 ipAddr[20];
- PT_BEGIN(pt);
- while(1){
- if(TRUE==talking.groupStart && talking.netWork.pdp!=0){
- if(paras.groupServer[0]=='\0'){
- wlog_warn("group server null , quit");
- talking.groupStart=FALSE;
- }else if(GROUP_IDLE==groupStatus){
- if(FALSE==setDomainForIp(paras.groupServer, &index)){
- wlog_warn("group domain fun busy, retry later");
- PTTimerStart(ptPool, &ptTimer,300);//3 seconds later retry
- PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer));
- }else{
- i=0;
- while(1){
- ret=getDomainForIp(index, ipAddr, sizeof(ipAddr)-1);
- if(ret==DOMAIN_ERR){
- wlog_error("group domain fail , quit");
- talking.groupStart=FALSE;
- groupStatus=GROUP_END;break;
- }else if(DOMAIN_OK==ret){
- wlog_info("group domain done:%s", ipAddr);
- tupParaSet(&tupsendPara, GROUP_SER_PORT, groupRecv_cb,GROUP_TCP_THREAD_STACK);
- timeout=5;
- groupStatus=GROUP_TCP;
- break;
- }else{
- PTTimerStart(ptPool, &ptTimer,50);//wait 0.5 per time
- PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer));
- if(++i>=10){
- wlog_error("group domain timeout, quit");
- talking.groupStart=FALSE;
- groupStatus=GROUP_END;break;
- }
- }
- }
- }
- }else if(GROUP_TCP==groupStatus){
- //do tcp for ip now
- if(1==talking.netWork.pdp){
- switch(tryConnectTup(ipAddr,TUP_TCP,&tupsendPara)){
- case TUP_STATUS_TRUE:
- wlog_info("group connect server ok");
- groupStatus=GROUP_SEND_WAIT_ACK;
- break;
- case TUP_STATUS_FALSE:
- if(--timeout){
- wlog_warn("group connect failed, retry later");
- PTTimerStart(ptPool, &ptTimer,300);//3 seconds later retry
- PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer));
- }else{
- wlog_warn("group connect timeout");
- groupStatus=GROUP_END;
- }
- break;
- case TUP_STATUS_WAIT:
- wlog_info("group connect wait");
- groupStatus=GROUP_TCP_WAIT;
- break;
- }
- }
- }else if(GROUP_SEND_WAIT_ACK==groupStatus){
- login_status=0;
- group_login_req();
- wlog_info("group wait ack");
- timeout=5;
- do{
- PTTimerStart(ptPool, &ptTimer,100);
- PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer));
- }while(--timeout && login_status<=0);
- if(1==login_status) wlog_info("group login ok");
- else if(2==login_status) wlog_info("group login but no right");
- else if(3==login_status) wlog_info("group packet error:len");
- else if(4==login_status) wlog_info("group packet error:sum");
- else if(5==login_status) wlog_info("group packet error:no login");
- else wlog_info("group login but server no responsed");
- //close tcp
- groupStatus=GROUP_END;
- }else if(GROUP_END==groupStatus){
- wlog_info("group end");
- talking.groupStart=FALSE;
- groupStatus=GROUP_IDLE;
- }
- }
- isGroupIdle();
- PTTimerStart(ptPool, &ptTimer,1);//should be fast
- PT_WAIT_UNTIL(pt, PTTimerIsExpired(&ptTimer));
- }
- PT_END(pt);
- }
- #ifdef ACTIVE_LIBEV
- /*
- groupRecv_cb
- 集群访问接收监控IO回调
- */
- static void groupRecv_cb(struct ev_loop *main_loop,ev_io *io_w, int revents){
- T_INT32 len;
- T_UINT8 buffer[256];
- if(EV_ERROR & revents){
- wlog_warn("groupRecv_cb error");
- ev_io_stop(main_loop, io_w);
- return;
- }
- len=recv(tupsendPara.fd, buffer, sizeof(buffer),0);
- wlog_info("get data len:%d", len);
- if(len>0) LoginAckHandle(buffer, len);
- userCloseSocket(tupsendPara.fd);
- ev_io_stop(main_loop, io_w);
- }
- #endif
- /*
- groupRecv
- 集群访问接收监控IO创建
- */
- static void groupRecv(T_INT32 fd){
- #ifdef ACTIVE_LIBEV
- static ev_io io_recv;
- wlog_info("groupRecv call");
- ev_io_stop(loop, &io_recv);
- ev_init(&io_recv, groupRecv_cb);
- ev_io_set(&io_recv, fd,EV_READ);
- ev_io_start(loop, &io_recv);
- #endif
- }
- /*
- SendLoginFill
- 集群访问请求数据包打包
- */
- static T_UINT16 SendLoginFill(T_UINT8 *info,T_UINT32 psn, T_INT8 *key){
- T_UINT8 *p=info;
- T_UINT16 i,len;
- i=0;
- p[i++]=(psn>>24);
- p[i++]=(psn>>16);
- p[i++]=(psn>>8);
- p[i++]=psn;
- p[i++]=0;
- p[i++]=0;
- p[i++]=0;
- p[i++]=0;
- memcpy(p+i, "groupPoc",8);
- i+= 8;
- p[i++]=0;
- p[i++]=0;
- len=strlen(key);
- if(len>20) len=20;
- memset(p+i, 0, 20);
- memcpy(p+i, key,len);
- i += 20;
- return i;
- }
- /*
- SendDataPack
- 集群访问请求数据包发送
- */
- static T_UINT16 SendDataPack(T_UINT8 *info, T_UINT16 datalen){
- T_UINT16 j,i=0;
- T_UINT8 *pBuf=info,sum;
- pBuf[0]=0x12;
- i=datalen+1;
- pBuf[1]=i>>8;
- pBuf[2]=i;
- sum=0;
- for(j=0;j<1+2+datalen;j++) sum += pBuf[j];
- pBuf[3+datalen]=sum;
- return (3+datalen+1);
- }
- /*
- group_login_req
- 集群访问请求数据包
- */
- static void group_login_req(void){
- T_UINT8 info[128];
- T_UINT16 length;
- length=SendLoginFill(info+3,paras.psn,paras.pass);
- length=SendDataPack(info,length);
- wlog_info("group send login req info");
- trySendTup(info, length, TUP_TCP, &tupsendPara);
- }
- /*
- LoginAckHandle
- 集群访问请求的响应数据解析
- */
- #define INVALID_IP_ADDRESS "0.0.0.0"
- static void LoginAckHandle(T_UINT8 *data, T_UINT16 len){
- T_UINT8 type,sum,right;
- T_UINT8 *p=data;
- T_UINT16 thisLen,i;
- T_INT8 info[IP_SIZE+1];
- T_UINT8 *pip,*gip;
- type=*p++;
- thisLen=*p++;
- thisLen <<= 8;
- thisLen &= 0xff00;
- thisLen |= *p++;
- if((1+2+thisLen) != len){
- login_status=3;
- return;
- }
- sum=0;
- for(i=0;i<thisLen+2;i++) sum += data[i];
- if(sum != data[len-1]){
- login_status=4;
- return;
- }
- if(type != 0x12){
- login_status=5;
- return;
- }
- right=*p++;
- if(0xea != right){
- login_status=2;
- return;
- }
- pip=data+23;
- gip=data+27;
- snprintf(talking.npAddr, sizeof(talking.npAddr), "%d.%d.%d.%d", pip[0],pip[1],pip[2],pip[3]);
- snprintf(talking.ngAddr, sizeof(talking.ngAddr), "%d.%d.%d.%d", gip[0],gip[1],gip[2],gip[3]);
- wlog_info("group:newpip[%s] newgip[%s]", talking.npAddr,talking.ngAddr);
- if(0!=strcmp(paras.pocServer.addr, talking.npAddr) && 0!=strcmp(INVALID_IP_ADDRESS,talking.npAddr)){
- wlog_warn("fine new pip");
- strcpy(paras.pocServer.addr, talking.npAddr);
- SaveNewPara();
- closeTalk();
- }
-
- login_status=1;
- }
|