#include "config.h" #include "types.h" #include "net.h" #include "printf.h" /* The most complex (but optional) logic for Vm communication and remote interconnect. NEW concept: 1. RX buffer [rxt|inp] for each port + rxt for all temporary messages (code, event, net, linear, growing up) + inp for all persistent messages (user, cyclic @ size/2, growing down) 2. TX buffer (cyclice) for each port 3. Message Log Buffer, cyclice, shared by all ports, ordered list of message descriptors A link connects two ports of two nodes. A port is commonly a bidirectional serial channel. Only one message per time can be received ot/and send via a port. Therfore, we need message buffering, not really conforming to a low-resource environment. There are up and down ports and ports (or generic ports). Program code is commonly propagated through the down port, result data propagated to a collector node via the up port. RX TX +----+ +----+ ----+ | |<- T ^ | |<- T | | M | | | M | V | F | V | M | | M |<- B | M |<- B | | | | +----+ 0 +----+ 0 M: Message Struct+Data F: Free RX buffer is always linear for code lexing directly from the RX buffer, TX buffer is a ring with ordered messages (FIFO). The RX buffer must hold code messages until compiled. Other messages can be routed (ASAP and in order) or hold for reading by programs. These messages destroy the ordering in the buffer and can create holes. New messages must be stored in free top or bottom or in-between areas of the buffer. */ #if HAS_NET>0 && HAS_NET_ROUTING > 0 port_t **Ports; int PortsDown[NUMPORTS],PortsUp[NUMPORTS]; message_t *CurrentInputMessage[NUMPORTS]; // for reading by VM message_t *CurrentOutputMessage[NUMPORTS]; static char message_start[]={ ' ', // NOP ']', // MSGCOMMAND '[', // MSGPROGRAM '[', // MSGPROGRAMPART '!', // MSGDATA '%', // MSGEVENT '$', // MSGUSER '@', // MSGNET '#', // MSGCONSOLE }; static char message_end[]={ ' ', // NOP '[', // MSGCOMMAND ']', // MSGPROGRAM ']', // MSGPROGRAMPART '!', // MSGDATA '%', // MSGEVENT '$', // MSGUSER '@', // MSGNET '#', // MSGCONSOLE }; void NetInit(int (**handlers)(),port_t **ports, char *buffer) { int boff=0; message_t *m; port_t *P; if (!buffer) buffer = (char * )MEMALLOC(NUMPORTS*PORTBUFFERSIZE); if (!ports) ports = (port_t**)MEMALLOC(NUMPORTS*sizeof(port_t)); Ports=ports; for(int i=0;istate=LSTART; P->index=i; if (handlers) { P->handlers=handlers; } P->rxBuffer=&buffer[boff]; P->txBuffer=&buffer[boff+PORTBUFFERSIZE/2]; boff+=PORTBUFFERSIZE; } } /* Alloceta a message header in the RX or TX buffer This is the new current message. The end pointer is updated adter the message is finalized (frozen). */ message_t *AllocateMessage(port_t *P,message_type_t t) { message_t *m=NULL; if ((t&0x7f)>20 && (t&0x7f)<128) { char dir = MSGDIR(t); t=t&0x7f; // a starting character, try to resolve message type switch ((char)t) { case ']': t=MSGCOMMAND|dir; break; case '[': t=MSGPROGRAM|dir; break; case '!': t=MSGDATA|dir; break; case '$': t=MSGUSER|dir; break; case '@': t=MSGNET|dir; break; case '%': t=MSGEVENT|dir; break; case '#': t=MSGCONSOLE|dir; break; default : t=MSGCONSOLE|dir; break; } } switch (MSGDIR(t)) { case MSGRX: #if DEBUG>0 print_format("Net.AllocateMessage.Rx[%d] (%d) [%d-%d] \n",P->index,t,P->rxBottom,P->rxTop); #endif // Linear buffer // Two free areas: [0,P->rxBottom-1] and [P->rxTop,Size-1] // And mabye free message blocks in [P->rxBottom,P->rxTop] if ((PORTBUFFERSIZE-P->rxTop)>P->rxBottom) { m=(message_t*)&P->rxBuffer[P->rxTop]; P->rxTop+=sizeof(message_t); } else if (P->rxBottom>sizeof(message_t)) { // how to exploit free bottom space? // 1. fill top area with dummy free message m=(message_t*)&P->rxBuffer[P->rxTop]; P->rxTop+=sizeof(message_t); m->start=P->rxTop; m->end=PORTBUFFERSIZE-1; m->type=MSGRX|MSGFREE; P->rxBottom=0; // 2. allocate new message at start of buffer m=(message_t*)&P->rxBuffer[0]; P->rxBottom=0; // 3. kick rxTop to end of this bottom message! P->rxTop=sizeof(message_t); } else { // TODO error return 0; } m->type = t; m->start = P->rxTop; m->end = P->rxTop; m->port = P->index; // P->rxCurrent=m; break; case MSGTX: // Pure ring buffer if ((P->txTop+sizeof(message_t))txBuffer[P->txTop]; P->txTop+=sizeof(message_t); // start of message data } else if (sizeof(message_t)txBottom) { // Turn around to bottom m=(message_t*)&P->txBuffer[0]; P->txTop=sizeof(message_t); // start of message data } else { return 0; // allocation failed } m->type = t; m->start = P->txTop; m->end = P->txTop; m->port = P->index; // P->txCurrent=m; break; } #if DEBUG>0 if (m) print_format("Net.AllocateMessage[%d] (%d):%d-%d \n",P->index,t,m->start,m->end); #endif return m; } message_t *FinalizeMessage(message_t *m) { port_t *P=Ports[m->port]; message_t *m2; // create new free message block switch (MSGDIR(m->type)) { case MSGRX: P->rxBuffer[m->end]=0; P->rxTop=m->end+1; break; case MSGTX: P->txBuffer[m->end]=0; P->txTop=(m->end+1)%PORTBUFFERSIZE; break; } } void FreeMessage(message_t *m) { port_t *P=Ports[m->port]; index_t start; // start of message structure switch (MSGDIR(m->type)) { case MSGRX: start=(index_t)((char *)m-P->rxBuffer); // rx is an out-of-order linear buffer // Check if this message is top or bottom message if (start==P->rxBottom) { P->rxBottom=(m->end+1)%PORTBUFFERSIZE; if (P->rxBottom==P->rxTop) { // reset buffer, it is empty P->rxBottom=P->rxTop=0; } return; // we are done } if (m->end==(P->rxTop-1)) { // top message, rewind rxTop P->rxTop=start; if (P->rxBottom==P->rxTop) { // reset buffer, it is empty P->rxBottom=P->rxTop=0; } return; // we are done } // mark message as free, must be later merged and freed m->type=MSGRX|MSGFREE; // TODO merge free blocks break; case MSGTX: // tx is a strictly ordered ring buffer, pure FIFO, just update txBottom P->txBottom=m->end+1; break; } } /* Message parser; async IO! */ /* Must be defined by host application */ extern int availbyte(int fd); extern char inbyte(int fd); extern int outbyte(int fd,char x); int PortReceiver(port_t *P) { message_type_t t; while (availbyte(P->rd)>0) { char ch=inbyte(P->rd); #if DEBUG > 0 print_format("PortReceiver[%c]\n",ch); #endif if (ch==0) return -1; switch (P->state) { case LSTART: t=-1; switch (ch) { case ']': t=MSGCOMMAND; break; case '[': t=MSGPROGRAM; break; case '!': t=MSGDATA; break; case '$': t=MSGUSER; break; case '@': t=MSGNET; break; case '%': t=MSGEVENT; break; case '#': t=MSGCONSOLE; break; } if (t>=0) { t|=MSGRX; P->rxCurrent=AllocateMessage(P,t); P->state=LPARSING; } break; case LPARSING: switch (ch) { case '[': switch (MSGTYPE(P->rxCurrent->type)) { case MSGPROGRAM: if (P->rxCurrent->start==P->rxTop) { // [[ P->rxCurrent->type=MSGPROGRAMPART|MSGRX; continue; break; } break; case MSGCOMMAND: P->state=LEND; continue; } case ']': switch (MSGTYPE(P->rxCurrent->type)) { case MSGPROGRAM: P->state=LEND; continue; break; case MSGPROGRAMPART: P->state=LPREEND; // wait for second ] continue; break; } default: // store message body P->rxBuffer[P->rxCurrent->end]=ch; P->rxCurrent->end++; // never wrap around } break; case LPREEND: // check for second delimiter char switch (MSGTYPE(P->rxCurrent->type)) { case MSGPROGRAMPART: if (ch==']') { P->state=LEND; } else { // TODO Error } break; } break; case LEND: FinalizeMessage(P->rxCurrent); if (ch!='\n') { // TODO message error } else { // process now, call handler if existing! #if DEBUG > 0 print_format("LinkReceiver LEND msg(%d)[%d:%d]\n", MSGTYPE(P->rxCurrent->type), P->rxCurrent->start,P->rxCurrent->end); #endif if (P->handlers && P->handlers[MSGTYPE(P->rxCurrent->type)]) P->handlers[MSGTYPE(P->rxCurrent->type)](P,P->rxCurrent); else { // release message FreeMessage(P->rxCurrent); } } P->state=LSTART; P->rxCurrent=NULL; break; } } return 0; } /* Read one byte from message via port rx buffer (VM API) */ int PortInByte(message_t *m) { char ch; port_t *l=Ports[m->port]; if (m->start != m->end) { ch=l->rxBuffer[m->start]; RINGINCR(m->start,PORTBUFFERSIZE); if (m->start==m->end) FreeMessage(m); } return ch; } /* Add one byte to message to port tx buffer (VM API) We need to buffer outgoing messages because the outgoing port can be busy (routing of messages) Only one outgoing message can be modified at the same time! After the message is complete the next message can be allocated and processed. Lock is required. */ int PortOutByte(message_t *m, char ch) { port_t *l=Ports[m->port]; message_type_t t= MSGTYPE(m->type); char rts=0; switch (MSGTYPE(m->type)) { case MSGPROGRAMPART: if (message_end[t]==ch && message_end[t]==l->txBuffer[m->end]) rts=1; break; case MSGPROGRAM: if (m->start==m->end && message_start[MSGPROGRAMPART]==ch) m->type=MSGPROGRAMPART; // [[...]] default: if (message_end[t]==ch) rts=1; } if (l->txCurrent==m) { // write through outbyte(l->wr,ch); if (rts) { FreeMessage(m); l->txCurrent=NULL; } } else { // buffer l->txBuffer[m->end]=ch; RINGINCR(m->end,PORTBUFFERSIZE); if (rts) { // prepare for sending FinalizeMessage(m); l->txCurrent=NULL; } } return 1; } void PrintPortMessages(port_t *P) { index_t p=0; message_t *m; print_format(">> Port (rxBottom=%d rxTop=%d txBottom=%d txTop=%d) <<\n",P->rxBottom,P->rxTop,P->txBottom,P->txTop); print_format("======= RX =======\n"); p=P->rxBottom; while (p!=P->rxTop) { m=(message_t*)&P->rxBuffer[p]; print_format("MSG[%d] T%d Start=%d End=%d\n",p,MSGTYPE(m->type),m->start,m->end); p=m->end+1; }; print_format("======= TX =======\n"); p=P->txBottom; while (p!=P->txTop) { m=(message_t*)&P->txBuffer[p]; print_format(" MSG[%d] T%d Start=%d End=%d\n",p,MSGTYPE(m->type),m->start,m->end); p=(m->end+1)%PORTBUFFERSIZE; } } /* Parse textual NCM and store binary packed structure in place. Note: Transmission of NCM is performed in packed binary format (inside @@ delimiters)! This enables fixed size of NCM messages. */ static int ParseNetworkControlMessage(char *buf) { char *p=buf; message_control_t ncm; // although the tx buffer is a ring, this message struct will always be linear in buffer memory (no wrap around) while (*p=='@' || *p==' ') p++; // First char is the type ncm.type=*p; p++; while (*p==',' || *p==' ') p++; ncm.narg=(int8_t)*p; p++; // can only be one digit 0-9 while (*p==',' || *p==' ') p++; for(index_t i=0;i='0' && *p<='9') { ncm.args[i]=(ncm.args[i]*10)+((int8_t)*p-'0'); p++; } while (*p==',' || *p==' ') p++; ncm.args[i]*=s; } memcpy(buf,&ncm,sizeof(message_control_t)); return sizeof(message_control_t); } #endif /* HAS_NET>0 */