Mon 16 Mar 11:09:06 CET 2026

This commit is contained in:
sbosse 2026-03-16 11:11:52 +01:00
parent 1281531a3f
commit 88d5637401

446
src/net.c Normal file
View File

@ -0,0 +1,446 @@
#include "config.h"
#include "types.h"
#include "net.h"
#include "printf.h"
/*
The most complex (but optional) logic for Vm communication and remote interconnect.
NEW concept:
1. RX buffer [rxt|inp] for each port
+ rxt for all temporary messages (code, event, net, linear, growing up)
+ inp for all persistent messages (user, cyclic @ size/2, growing down)
2. TX buffer (cyclice) for each port
3. Message Log Buffer, cyclice, shared by all ports, ordered list of message descriptors
A link connects two ports of two nodes.
A port is commonly a bidirectional serial channel.
Only one message per time can be received ot/and send via a port.
Therfore, we need message buffering, not really conforming to a low-resource environment.
There are up and down ports and ports (or generic ports).
Program code is commonly propagated through the down port, result data propagated to a collector
node via the up port.
RX TX
+----+ +----+ ----+
| |<- T ^ | |<- T |
| M | | | M | V
| F | V | M |
| M |<- B | M |<- B
| | | |
+----+ 0 +----+ 0
M: Message Struct+Data
F: Free
RX buffer is always linear for code lexing directly from the RX buffer,
TX buffer is a ring with ordered messages (FIFO).
The RX buffer must hold code messages until compiled. Other messages
can be routed (ASAP and in order) or hold for reading by programs. These
messages destroy the ordering in the buffer and can create holes.
New messages must be stored in free top or bottom or in-between areas of the buffer.
*/
#if HAS_NET>0 && HAS_NET_ROUTING > 0
port_t **Ports;
int PortsDown[NUMPORTS],PortsUp[NUMPORTS];
message_t *CurrentInputMessage[NUMPORTS]; // for reading by VM
message_t *CurrentOutputMessage[NUMPORTS];
static char message_start[]={
' ', // NOP
']', // MSGCOMMAND
'[', // MSGPROGRAM
'[', // MSGPROGRAMPART
'!', // MSGDATA
'%', // MSGEVENT
'$', // MSGUSER
'@', // MSGNET
'#', // MSGCONSOLE
};
static char message_end[]={
' ', // NOP
'[', // MSGCOMMAND
']', // MSGPROGRAM
']', // MSGPROGRAMPART
'!', // MSGDATA
'%', // MSGEVENT
'$', // MSGUSER
'@', // MSGNET
'#', // MSGCONSOLE
};
void NetInit(int (**handlers)(),port_t **ports, char *buffer) {
int boff=0;
message_t *m;
port_t *P;
if (!buffer) buffer = (char * )MEMALLOC(NUMPORTS*PORTBUFFERSIZE);
if (!ports) ports = (port_t**)MEMALLOC(NUMPORTS*sizeof(port_t));
Ports=ports;
for(int i=0;i<NUMPORTS;i++) {
CurrentInputMessage[i]=NULL;
CurrentOutputMessage[i]=NULL;
PortsDown[i]=PortsUp[i]=-1; // initially unknown
P=Ports[i];
memset(P,0,sizeof(port_t));
// allocate first message (free)
P->state=LSTART;
P->index=i;
if (handlers) {
P->handlers=handlers;
}
P->rxBuffer=&buffer[boff];
P->txBuffer=&buffer[boff+PORTBUFFERSIZE/2];
boff+=PORTBUFFERSIZE;
}
}
/*
Alloceta a message header in the RX or TX buffer
This is the new current message. The end pointer is
updated adter the message is finalized (frozen).
*/
message_t *AllocateMessage(port_t *P,message_type_t t) {
message_t *m=NULL;
if ((t&0x7f)>20 && (t&0x7f)<128) {
char dir = MSGDIR(t);
t=t&0x7f;
// a starting character, try to resolve message type
switch ((char)t) {
case ']': t=MSGCOMMAND|dir; break;
case '[': t=MSGPROGRAM|dir; break;
case '!': t=MSGDATA|dir; break;
case '$': t=MSGUSER|dir; break;
case '@': t=MSGNET|dir; break;
case '%': t=MSGEVENT|dir; break;
case '#': t=MSGCONSOLE|dir; break;
default : t=MSGCONSOLE|dir; break;
}
}
switch (MSGDIR(t)) {
case MSGRX:
#if DEBUG>0
print_format("Net.AllocateMessage.Rx[%d] (%d) [%d-%d] \n",P->index,t,P->rxBottom,P->rxTop);
#endif
// Linear buffer
// Two free areas: [0,P->rxBottom-1] and [P->rxTop,Size-1]
// And mabye free message blocks in [P->rxBottom,P->rxTop]
if ((PORTBUFFERSIZE-P->rxTop)>P->rxBottom) {
m=(message_t*)&P->rxBuffer[P->rxTop];
P->rxTop+=sizeof(message_t);
} else if (P->rxBottom>sizeof(message_t)) {
// how to exploit free bottom space?
// 1. fill top area with dummy free message
m=(message_t*)&P->rxBuffer[P->rxTop];
P->rxTop+=sizeof(message_t);
m->start=P->rxTop;
m->end=PORTBUFFERSIZE-1;
m->type=MSGRX|MSGFREE;
P->rxBottom=0;
// 2. allocate new message at start of buffer
m=(message_t*)&P->rxBuffer[0];
P->rxBottom=0;
// 3. kick rxTop to end of this bottom message!
P->rxTop=sizeof(message_t);
} else {
// TODO error
return 0;
}
m->type = t;
m->start = P->rxTop;
m->end = P->rxTop;
m->port = P->index;
// P->rxCurrent=m;
break;
case MSGTX:
// Pure ring buffer
if ((P->txTop+sizeof(message_t))<PORTBUFFERSIZE) {
m=(message_t*)&P->txBuffer[P->txTop];
P->txTop+=sizeof(message_t); // start of message data
} else if (sizeof(message_t)<P->txBottom) {
// Turn around to bottom
m=(message_t*)&P->txBuffer[0];
P->txTop=sizeof(message_t); // start of message data
} else {
return 0; // allocation failed
}
m->type = t;
m->start = P->txTop;
m->end = P->txTop;
m->port = P->index;
// P->txCurrent=m;
break;
}
#if DEBUG>0
if (m) print_format("Net.AllocateMessage[%d] (%d):%d-%d \n",P->index,t,m->start,m->end);
#endif
return m;
}
message_t *FinalizeMessage(message_t *m) {
port_t *P=Ports[m->port];
message_t *m2;
// create new free message block
switch (MSGDIR(m->type)) {
case MSGRX:
P->rxBuffer[m->end]=0;
P->rxTop=m->end+1;
break;
case MSGTX:
P->txBuffer[m->end]=0;
P->txTop=(m->end+1)%PORTBUFFERSIZE;
break;
}
}
void FreeMessage(message_t *m) {
port_t *P=Ports[m->port];
index_t start; // start of message structure
switch (MSGDIR(m->type)) {
case MSGRX:
start=(index_t)((char *)m-P->rxBuffer);
// rx is an out-of-order linear buffer
// Check if this message is top or bottom message
if (start==P->rxBottom) {
P->rxBottom=(m->end+1)%PORTBUFFERSIZE;
if (P->rxBottom==P->rxTop) {
// reset buffer, it is empty
P->rxBottom=P->rxTop=0;
}
return; // we are done
}
if (m->end==(P->rxTop-1)) {
// top message, rewind rxTop
P->rxTop=start;
if (P->rxBottom==P->rxTop) {
// reset buffer, it is empty
P->rxBottom=P->rxTop=0;
}
return; // we are done
}
// mark message as free, must be later merged and freed
m->type=MSGRX|MSGFREE;
// TODO merge free blocks
break;
case MSGTX:
// tx is a strictly ordered ring buffer, pure FIFO, just update txBottom
P->txBottom=m->end+1;
break;
}
}
/*
Message parser; async IO!
*/
/*
Must be defined by host application
*/
extern int availbyte(int fd);
extern char inbyte(int fd);
extern int outbyte(int fd,char x);
int PortReceiver(port_t *P) {
message_type_t t;
while (availbyte(P->rd)>0) {
char ch=inbyte(P->rd);
#if DEBUG > 0
print_format("PortReceiver[%c]\n",ch);
#endif
if (ch==0) return -1;
switch (P->state) {
case LSTART:
t=-1;
switch (ch) {
case ']': t=MSGCOMMAND; break;
case '[': t=MSGPROGRAM; break;
case '!': t=MSGDATA; break;
case '$': t=MSGUSER; break;
case '@': t=MSGNET; break;
case '%': t=MSGEVENT; break;
case '#': t=MSGCONSOLE; break;
}
if (t>=0) {
t|=MSGRX;
P->rxCurrent=AllocateMessage(P,t);
P->state=LPARSING;
}
break;
case LPARSING:
switch (ch) {
case '[':
switch (MSGTYPE(P->rxCurrent->type)) {
case MSGPROGRAM:
if (P->rxCurrent->start==P->rxTop) {
// [[
P->rxCurrent->type=MSGPROGRAMPART|MSGRX;
continue;
break;
}
break;
case MSGCOMMAND:
P->state=LEND;
continue;
}
case ']':
switch (MSGTYPE(P->rxCurrent->type)) {
case MSGPROGRAM:
P->state=LEND;
continue;
break;
case MSGPROGRAMPART:
P->state=LPREEND; // wait for second ]
continue;
break;
}
default:
// store message body
P->rxBuffer[P->rxCurrent->end]=ch;
P->rxCurrent->end++; // never wrap around
}
break;
case LPREEND:
// check for second delimiter char
switch (MSGTYPE(P->rxCurrent->type)) {
case MSGPROGRAMPART:
if (ch==']') {
P->state=LEND;
} else {
// TODO Error
}
break;
}
break;
case LEND:
FinalizeMessage(P->rxCurrent);
if (ch!='\n') {
// TODO message error
} else {
// process now, call handler if existing!
#if DEBUG > 0
print_format("LinkReceiver LEND msg(%d)[%d:%d]\n",
MSGTYPE(P->rxCurrent->type),
P->rxCurrent->start,P->rxCurrent->end);
#endif
if (P->handlers && P->handlers[MSGTYPE(P->rxCurrent->type)])
P->handlers[MSGTYPE(P->rxCurrent->type)](P,P->rxCurrent);
else {
// release message
FreeMessage(P->rxCurrent);
}
}
P->state=LSTART;
P->rxCurrent=NULL;
break;
}
}
return 0;
}
/*
Read one byte from message via port rx buffer (VM API)
*/
int PortInByte(message_t *m) {
char ch;
port_t *l=Ports[m->port];
if (m->start != m->end) {
ch=l->rxBuffer[m->start];
RINGINCR(m->start,PORTBUFFERSIZE);
if (m->start==m->end) FreeMessage(m);
}
return ch;
}
/*
Add one byte to message to port tx buffer (VM API)
We need to buffer outgoing messages because the outgoing port can be busy (routing of messages)
Only one outgoing message can be modified at the same time!
After the message is complete the next message can be allocated and processed. Lock is required.
*/
int PortOutByte(message_t *m, char ch) {
port_t *l=Ports[m->port];
message_type_t t= MSGTYPE(m->type);
char rts=0;
switch (MSGTYPE(m->type)) {
case MSGPROGRAMPART:
if (message_end[t]==ch && message_end[t]==l->txBuffer[m->end])
rts=1;
break;
case MSGPROGRAM:
if (m->start==m->end && message_start[MSGPROGRAMPART]==ch)
m->type=MSGPROGRAMPART; // [[...]]
default:
if (message_end[t]==ch)
rts=1;
}
if (l->txCurrent==m) {
// write through
outbyte(l->wr,ch);
if (rts) {
FreeMessage(m);
l->txCurrent=NULL;
}
} else {
// buffer
l->txBuffer[m->end]=ch;
RINGINCR(m->end,PORTBUFFERSIZE);
if (rts) {
// prepare for sending
FinalizeMessage(m);
l->txCurrent=NULL;
}
}
return 1;
}
void PrintPortMessages(port_t *P) {
index_t p=0;
message_t *m;
print_format(">> Port (rxBottom=%d rxTop=%d txBottom=%d txTop=%d) <<\n",P->rxBottom,P->rxTop,P->txBottom,P->txTop);
print_format("======= RX =======\n");
p=P->rxBottom;
while (p!=P->rxTop) {
m=(message_t*)&P->rxBuffer[p];
print_format("MSG[%d] T%d Start=%d End=%d\n",p,MSGTYPE(m->type),m->start,m->end);
p=m->end+1;
};
print_format("======= TX =======\n");
p=P->txBottom;
while (p!=P->txTop) {
m=(message_t*)&P->txBuffer[p];
print_format(" MSG[%d] T%d Start=%d End=%d\n",p,MSGTYPE(m->type),m->start,m->end);
p=(m->end+1)%PORTBUFFERSIZE;
}
}
/*
Parse textual NCM and store binary packed structure in place.
Note: Transmission of NCM is performed in packed binary format (inside @@ delimiters)! This enables
fixed size of NCM messages.
*/
static int ParseNetworkControlMessage(char *buf) {
char *p=buf;
message_control_t ncm;
// although the tx buffer is a ring, this message struct will always be linear in buffer memory (no wrap around)
while (*p=='@' || *p==' ') p++;
// First char is the type
ncm.type=*p; p++;
while (*p==',' || *p==' ') p++;
ncm.narg=(int8_t)*p; p++; // can only be one digit 0-9
while (*p==',' || *p==' ') p++;
for(index_t i=0;i<ncm.narg;i++) {
int8_t s=0;
ncm.args[i]=0;
if (*p=='-') s=-1;
while (*p>='0' && *p<='9') { ncm.args[i]=(ncm.args[i]*10)+((int8_t)*p-'0'); p++; }
while (*p==',' || *p==' ') p++;
ncm.args[i]*=s;
}
memcpy(buf,&ncm,sizeof(message_control_t));
return sizeof(message_control_t);
}
#endif /* HAS_NET>0 */