/*
Copyright (C) 2016-2019 The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file LICENSE for details.
*/

#include "kernel/types.h"
#include "pipe.h"
#include "kmalloc.h"
#include "process.h"
#include "page.h"

#define PIPE_SIZE PAGE_SIZE

struct pipe {
	char *buffer;
	int read_pos;
	int write_pos;
	int flushed;
	int refcount;
	struct list queue;
};

struct pipe *pipe_create()
{
	struct pipe *p = kmalloc(sizeof(*p));
	if(!p) return 0;
	
	p->buffer = page_alloc(1);
	if(!p->buffer) {
		kfree(p);
		return 0;
	}
	p->read_pos = 0;
	p->write_pos = 0;
	p->flushed = 0;
	p->queue.head = 0;
	p->queue.tail = 0;
	p->refcount = 1;
	return p;
}

struct pipe *pipe_addref( struct pipe *p )
{
	p->refcount++;
	return p;
}

void pipe_flush(struct pipe *p)
{
	if(p) {
		p->flushed = 1;
	}
}

void pipe_delete(struct pipe *p)
{
	if(!p) return;

	p->refcount--;
	if(p->refcount==0) {
		if(p->buffer) {
			page_free(p->buffer);
		}
		kfree(p);
	}
}

static int pipe_write_internal(struct pipe *p, char *buffer, int size, int blocking )
{
	if(!p || !buffer) {
		return -1;
	}
	int written = 0;
	if(blocking) {
		for(written = 0; written < size; written++) {
			while((p->write_pos + 1) % PIPE_SIZE == p->read_pos) {
				if(p->flushed) {
					p->flushed = 0;
					return written;
				}
				process_wait(&p->queue);
			}
			p->buffer[p->write_pos] = buffer[written];
			p->write_pos = (p->write_pos + 1) % PIPE_SIZE;
		}
		process_wakeup_all(&p->queue);
	} else {
		while(written < size && p->write_pos != (p->read_pos - 1) % PIPE_SIZE) {
			p->buffer[p->write_pos] = buffer[written];
			p->write_pos = (p->write_pos + 1) % PIPE_SIZE;
			written++;
		}
	}
	p->flushed = 0;
	return written;
}

int pipe_write(struct pipe *p, char *buffer, int size)
{
	return pipe_write_internal(p, buffer, size, 1);
}

int pipe_write_nonblock(struct pipe *p, char *buffer, int size)
{
	return pipe_write_internal(p, buffer, size, 0);
}

static int pipe_read_internal(struct pipe *p, char *buffer, int size, int blocking)
{
	if(!p || !buffer) {
		return -1;
	}
	int read = 0;
	if(blocking) {
		for(read = 0; read < size; read++) {
			while(p->write_pos == p->read_pos) {
				if(p->flushed) {
					p->flushed = 0;
					return read;
				}
				if (blocking == 0) {
					return -1;
				}
				process_wait(&p->queue);
			}
			buffer[read] = p->buffer[p->read_pos];
			p->read_pos = (p->read_pos + 1) % PIPE_SIZE;
		}
		process_wakeup_all(&p->queue);
	} else {
		while(read < size && p->read_pos != p->write_pos) {
			buffer[read] = p->buffer[p->read_pos];
			p->read_pos = (p->read_pos + 1) % PIPE_SIZE;
			read++;
		}
	}
	p->flushed = 0;
	return read;
}

int pipe_read(struct pipe *p, char *buffer, int size)
{
	return pipe_read_internal(p, buffer, size, 1);
}

int pipe_read_nonblock(struct pipe *p, char *buffer, int size)
{
	return pipe_read_internal(p, buffer, size, 0);
}

int pipe_size( struct pipe *p )
{
	return PIPE_SIZE;
}