From 88d563740196e42394d5f7512295d1f639369014 Mon Sep 17 00:00:00 2001 From: sbosse Date: Mon, 16 Mar 2026 11:11:52 +0100 Subject: [PATCH] Mon 16 Mar 11:09:06 CET 2026 --- src/net.c | 446 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 446 insertions(+) create mode 100644 src/net.c diff --git a/src/net.c b/src/net.c new file mode 100644 index 0000000..da7e073 --- /dev/null +++ b/src/net.c @@ -0,0 +1,446 @@ + +#include "config.h" +#include "types.h" +#include "net.h" +#include "printf.h" +/* + The most complex (but optional) logic for Vm communication and remote interconnect. + + NEW concept: + + 1. RX buffer [rxt|inp] for each port + + rxt for all temporary messages (code, event, net, linear, growing up) + + inp for all persistent messages (user, cyclic @ size/2, growing down) + 2. TX buffer (cyclice) for each port + 3. Message Log Buffer, cyclice, shared by all ports, ordered list of message descriptors + + A link connects two ports of two nodes. + A port is commonly a bidirectional serial channel. + Only one message per time can be received ot/and send via a port. + Therfore, we need message buffering, not really conforming to a low-resource environment. + There are up and down ports and ports (or generic ports). + Program code is commonly propagated through the down port, result data propagated to a collector + node via the up port. + + RX TX + +----+ +----+ ----+ + | |<- T ^ | |<- T | + | M | | | M | V + | F | V | M | + | M |<- B | M |<- B + | | | | + +----+ 0 +----+ 0 + + M: Message Struct+Data + F: Free + + RX buffer is always linear for code lexing directly from the RX buffer, + TX buffer is a ring with ordered messages (FIFO). + The RX buffer must hold code messages until compiled. Other messages + can be routed (ASAP and in order) or hold for reading by programs. These + messages destroy the ordering in the buffer and can create holes. + New messages must be stored in free top or bottom or in-between areas of the buffer. +*/ +#if HAS_NET>0 && HAS_NET_ROUTING > 0 +port_t **Ports; +int PortsDown[NUMPORTS],PortsUp[NUMPORTS]; +message_t *CurrentInputMessage[NUMPORTS]; // for reading by VM +message_t *CurrentOutputMessage[NUMPORTS]; + +static char message_start[]={ + ' ', // NOP + ']', // MSGCOMMAND + '[', // MSGPROGRAM + '[', // MSGPROGRAMPART + '!', // MSGDATA + '%', // MSGEVENT + '$', // MSGUSER + '@', // MSGNET + '#', // MSGCONSOLE +}; +static char message_end[]={ + ' ', // NOP + '[', // MSGCOMMAND + ']', // MSGPROGRAM + ']', // MSGPROGRAMPART + '!', // MSGDATA + '%', // MSGEVENT + '$', // MSGUSER + '@', // MSGNET + '#', // MSGCONSOLE +}; + +void NetInit(int (**handlers)(),port_t **ports, char *buffer) { + int boff=0; + message_t *m; + port_t *P; + if (!buffer) buffer = (char * )MEMALLOC(NUMPORTS*PORTBUFFERSIZE); + if (!ports) ports = (port_t**)MEMALLOC(NUMPORTS*sizeof(port_t)); + Ports=ports; + for(int i=0;istate=LSTART; + P->index=i; + if (handlers) { + P->handlers=handlers; + } + P->rxBuffer=&buffer[boff]; + P->txBuffer=&buffer[boff+PORTBUFFERSIZE/2]; + boff+=PORTBUFFERSIZE; + } +} +/* + Alloceta a message header in the RX or TX buffer + This is the new current message. The end pointer is + updated adter the message is finalized (frozen). +*/ +message_t *AllocateMessage(port_t *P,message_type_t t) { + message_t *m=NULL; + if ((t&0x7f)>20 && (t&0x7f)<128) { + char dir = MSGDIR(t); + t=t&0x7f; + // a starting character, try to resolve message type + switch ((char)t) { + case ']': t=MSGCOMMAND|dir; break; + case '[': t=MSGPROGRAM|dir; break; + case '!': t=MSGDATA|dir; break; + case '$': t=MSGUSER|dir; break; + case '@': t=MSGNET|dir; break; + case '%': t=MSGEVENT|dir; break; + case '#': t=MSGCONSOLE|dir; break; + default : t=MSGCONSOLE|dir; break; + } + } + switch (MSGDIR(t)) { + case MSGRX: +#if DEBUG>0 + print_format("Net.AllocateMessage.Rx[%d] (%d) [%d-%d] \n",P->index,t,P->rxBottom,P->rxTop); +#endif + // Linear buffer + // Two free areas: [0,P->rxBottom-1] and [P->rxTop,Size-1] + // And mabye free message blocks in [P->rxBottom,P->rxTop] + if ((PORTBUFFERSIZE-P->rxTop)>P->rxBottom) { + m=(message_t*)&P->rxBuffer[P->rxTop]; + P->rxTop+=sizeof(message_t); + } else if (P->rxBottom>sizeof(message_t)) { + // how to exploit free bottom space? + // 1. fill top area with dummy free message + m=(message_t*)&P->rxBuffer[P->rxTop]; + P->rxTop+=sizeof(message_t); + m->start=P->rxTop; + m->end=PORTBUFFERSIZE-1; + m->type=MSGRX|MSGFREE; + P->rxBottom=0; + // 2. allocate new message at start of buffer + m=(message_t*)&P->rxBuffer[0]; + P->rxBottom=0; + // 3. kick rxTop to end of this bottom message! + P->rxTop=sizeof(message_t); + } else { + // TODO error + return 0; + } + m->type = t; + m->start = P->rxTop; + m->end = P->rxTop; + m->port = P->index; + // P->rxCurrent=m; + break; + case MSGTX: + // Pure ring buffer + if ((P->txTop+sizeof(message_t))txBuffer[P->txTop]; + P->txTop+=sizeof(message_t); // start of message data + } else if (sizeof(message_t)txBottom) { + // Turn around to bottom + m=(message_t*)&P->txBuffer[0]; + P->txTop=sizeof(message_t); // start of message data + } else { + return 0; // allocation failed + } + + m->type = t; + m->start = P->txTop; + m->end = P->txTop; + m->port = P->index; + // P->txCurrent=m; + break; + } +#if DEBUG>0 + if (m) print_format("Net.AllocateMessage[%d] (%d):%d-%d \n",P->index,t,m->start,m->end); +#endif + return m; +} + +message_t *FinalizeMessage(message_t *m) { + port_t *P=Ports[m->port]; + message_t *m2; + // create new free message block + switch (MSGDIR(m->type)) { + case MSGRX: + P->rxBuffer[m->end]=0; + P->rxTop=m->end+1; + break; + case MSGTX: + P->txBuffer[m->end]=0; + P->txTop=(m->end+1)%PORTBUFFERSIZE; + break; + } +} + +void FreeMessage(message_t *m) { + port_t *P=Ports[m->port]; + index_t start; // start of message structure + switch (MSGDIR(m->type)) { + case MSGRX: + start=(index_t)((char *)m-P->rxBuffer); + // rx is an out-of-order linear buffer + // Check if this message is top or bottom message + if (start==P->rxBottom) { + P->rxBottom=(m->end+1)%PORTBUFFERSIZE; + if (P->rxBottom==P->rxTop) { + // reset buffer, it is empty + P->rxBottom=P->rxTop=0; + } + return; // we are done + } + if (m->end==(P->rxTop-1)) { + // top message, rewind rxTop + P->rxTop=start; + if (P->rxBottom==P->rxTop) { + // reset buffer, it is empty + P->rxBottom=P->rxTop=0; + } + return; // we are done + } + // mark message as free, must be later merged and freed + m->type=MSGRX|MSGFREE; + // TODO merge free blocks + break; + case MSGTX: + // tx is a strictly ordered ring buffer, pure FIFO, just update txBottom + P->txBottom=m->end+1; + break; + } +} + +/* + Message parser; async IO! +*/ + +/* + Must be defined by host application +*/ +extern int availbyte(int fd); +extern char inbyte(int fd); +extern int outbyte(int fd,char x); + +int PortReceiver(port_t *P) { + message_type_t t; + + while (availbyte(P->rd)>0) { + char ch=inbyte(P->rd); +#if DEBUG > 0 + print_format("PortReceiver[%c]\n",ch); +#endif + if (ch==0) return -1; + switch (P->state) { + case LSTART: + t=-1; + switch (ch) { + case ']': t=MSGCOMMAND; break; + case '[': t=MSGPROGRAM; break; + case '!': t=MSGDATA; break; + case '$': t=MSGUSER; break; + case '@': t=MSGNET; break; + case '%': t=MSGEVENT; break; + case '#': t=MSGCONSOLE; break; + } + if (t>=0) { + t|=MSGRX; + P->rxCurrent=AllocateMessage(P,t); + P->state=LPARSING; + } + break; + case LPARSING: + switch (ch) { + case '[': + switch (MSGTYPE(P->rxCurrent->type)) { + case MSGPROGRAM: + if (P->rxCurrent->start==P->rxTop) { + // [[ + P->rxCurrent->type=MSGPROGRAMPART|MSGRX; + continue; + break; + } + break; + case MSGCOMMAND: + P->state=LEND; + continue; + } + case ']': + switch (MSGTYPE(P->rxCurrent->type)) { + case MSGPROGRAM: + P->state=LEND; + continue; + break; + case MSGPROGRAMPART: + P->state=LPREEND; // wait for second ] + continue; + break; + } + default: + // store message body + P->rxBuffer[P->rxCurrent->end]=ch; + P->rxCurrent->end++; // never wrap around + } + break; + case LPREEND: + // check for second delimiter char + switch (MSGTYPE(P->rxCurrent->type)) { + case MSGPROGRAMPART: + if (ch==']') { + P->state=LEND; + } else { + // TODO Error + } + break; + } + break; + case LEND: + FinalizeMessage(P->rxCurrent); + if (ch!='\n') { + // TODO message error + } else { + // process now, call handler if existing! +#if DEBUG > 0 + print_format("LinkReceiver LEND msg(%d)[%d:%d]\n", + MSGTYPE(P->rxCurrent->type), + P->rxCurrent->start,P->rxCurrent->end); +#endif + if (P->handlers && P->handlers[MSGTYPE(P->rxCurrent->type)]) + P->handlers[MSGTYPE(P->rxCurrent->type)](P,P->rxCurrent); + else { + // release message + FreeMessage(P->rxCurrent); + } + } + P->state=LSTART; + P->rxCurrent=NULL; + break; + } + } + return 0; +} + + +/* + Read one byte from message via port rx buffer (VM API) +*/ +int PortInByte(message_t *m) { + char ch; + port_t *l=Ports[m->port]; + if (m->start != m->end) { + ch=l->rxBuffer[m->start]; + RINGINCR(m->start,PORTBUFFERSIZE); + if (m->start==m->end) FreeMessage(m); + } + return ch; +} + +/* + Add one byte to message to port tx buffer (VM API) + We need to buffer outgoing messages because the outgoing port can be busy (routing of messages) + Only one outgoing message can be modified at the same time! + After the message is complete the next message can be allocated and processed. Lock is required. +*/ +int PortOutByte(message_t *m, char ch) { + port_t *l=Ports[m->port]; + message_type_t t= MSGTYPE(m->type); + char rts=0; + switch (MSGTYPE(m->type)) { + case MSGPROGRAMPART: + if (message_end[t]==ch && message_end[t]==l->txBuffer[m->end]) + rts=1; + break; + case MSGPROGRAM: + if (m->start==m->end && message_start[MSGPROGRAMPART]==ch) + m->type=MSGPROGRAMPART; // [[...]] + default: + if (message_end[t]==ch) + rts=1; + } + if (l->txCurrent==m) { + // write through + outbyte(l->wr,ch); + if (rts) { + FreeMessage(m); + l->txCurrent=NULL; + } + } else { + // buffer + l->txBuffer[m->end]=ch; + RINGINCR(m->end,PORTBUFFERSIZE); + if (rts) { + // prepare for sending + FinalizeMessage(m); + l->txCurrent=NULL; + } + } + return 1; +} + +void PrintPortMessages(port_t *P) { + index_t p=0; + message_t *m; + print_format(">> Port (rxBottom=%d rxTop=%d txBottom=%d txTop=%d) <<\n",P->rxBottom,P->rxTop,P->txBottom,P->txTop); + print_format("======= RX =======\n"); + p=P->rxBottom; + while (p!=P->rxTop) { + m=(message_t*)&P->rxBuffer[p]; + print_format("MSG[%d] T%d Start=%d End=%d\n",p,MSGTYPE(m->type),m->start,m->end); + p=m->end+1; + }; + print_format("======= TX =======\n"); + p=P->txBottom; + while (p!=P->txTop) { + m=(message_t*)&P->txBuffer[p]; + print_format(" MSG[%d] T%d Start=%d End=%d\n",p,MSGTYPE(m->type),m->start,m->end); + p=(m->end+1)%PORTBUFFERSIZE; + } +} + +/* + Parse textual NCM and store binary packed structure in place. + Note: Transmission of NCM is performed in packed binary format (inside @@ delimiters)! This enables + fixed size of NCM messages. +*/ +static int ParseNetworkControlMessage(char *buf) { + char *p=buf; + message_control_t ncm; + // although the tx buffer is a ring, this message struct will always be linear in buffer memory (no wrap around) + while (*p=='@' || *p==' ') p++; + // First char is the type + ncm.type=*p; p++; + while (*p==',' || *p==' ') p++; + ncm.narg=(int8_t)*p; p++; // can only be one digit 0-9 + while (*p==',' || *p==' ') p++; + for(index_t i=0;i='0' && *p<='9') { ncm.args[i]=(ncm.args[i]*10)+((int8_t)*p-'0'); p++; } + while (*p==',' || *p==' ') p++; + ncm.args[i]*=s; + } + memcpy(buf,&ncm,sizeof(message_control_t)); + return sizeof(message_control_t); +} +#endif /* HAS_NET>0 */ + +