419 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			419 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| //var clog = clog;
 | |
| var clog = function() {};
 | |
| 
 | |
| var EventEmitter = require('events').EventEmitter;
 | |
| var util = require('util');
 | |
| 
 | |
| var argument_length = {};
 | |
| argument_length.C = 1;
 | |
| argument_length.S = 2;
 | |
| argument_length.s = 2;
 | |
| argument_length.L = 4;
 | |
| argument_length.l = 4;
 | |
| argument_length.x = 1;
 | |
| 
 | |
| function ReadFormatRequest(format, callback)
 | |
| {
 | |
|     this.format = format;
 | |
|     this.current_arg = 0;
 | |
|     this.data = [];
 | |
|     this.callback = callback;
 | |
| }
 | |
| 
 | |
| /*
 | |
| function ReadBuflist(length, callback)
 | |
| {
 | |
|     this.length = length
 | |
|     this.callback = callback;
 | |
|     
 | |
| }
 | |
| */
 | |
| 
 | |
| function ReadFixedRequest(length, callback)
 | |
| {
 | |
|     this.length = length;
 | |
|     this.callback = callback;
 | |
|     //clog(length);
 | |
|     this.data = new Buffer(length);
 | |
|     this.received_bytes = 0;
 | |
| }
 | |
| 
 | |
| ReadFixedRequest.prototype.execute = function(bufferlist)
 | |
| {
 | |
| 
 | |
|     // TODO: this is a brute force version
 | |
|     // replace with Buffer.slice calls
 | |
|     
 | |
| 
 | |
|     // bufferlist:
 | |
|     // offset
 | |
|     // readlist:
 | |
|     // [ b1 b2 b3 b4 b5 ]
 | |
|     
 | |
|     
 | |
|     var to_receive = this.length - this.received_bytes;
 | |
|     //clog([bufferlist.offset, bufferlist.length, to_receive]);
 | |
|     
 | |
|     var buffs = bufferlist.readlist;
 | |
|     var off = bufferlist.offset;
 | |
|     if (buffs.length == 0)
 | |
|         return false;
 | |
| 
 | |
|     var curbuff = buffs[0];
 | |
| 
 | |
|     // first buffer is bigger than request
 | |
|     if (curbuff.length - bufferlist.offset >= to_receive)
 | |
|     {
 | |
|         // copy(targetBuffer, targetStart=0, sourceStart=0, sourceEnd=buffer.length)
 | |
|         curbuff.copy(this.data, this.received_bytes, off, off+to_receive);
 | |
|         bufferlist.offset += to_receive;
 | |
|         this.received_bytes += to_receive;
 | |
|         bufferlist.length -= to_receive;   
 | |
|         
 | |
|         if (bufferlist.offset == curbuff.length)
 | |
|         {
 | |
|              bufferlist.readlist.shift();
 | |
|              bufferlist.offset = 0;
 | |
|         }
 | |
| 
 | |
|         //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]);
 | |
|         this.callback(this.data);
 | |
|         return true;
 | |
|     } 
 | |
|     
 | |
| 
 | |
|     
 | |
|    // while (buffs.length > 0)     
 | |
|    // {
 | |
|    // }
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
| 
 | |
|     
 | |
|     if (0)//bufferlist.readlist.length == 1)
 | |
|     {
 | |
|         var to_receive = this.length - this.received_bytes;
 | |
|         var buff = bufferlist.readlist[0];
 | |
|         if ( (buff.length-bufferlist.offset) >= to_receive){
 | |
|             clog(["using Buffer.copy", buff.length]);
 | |
|             buff.copy(this.data, to_receive, bufferlist.offset, bufferlist.offset + to_receive);
 | |
|             bufferlist.length -= to_receive;
 | |
| 		return false;
 | |
|         }      
 | |
|         //var to_receive = this.length - this.received_bytes;
 | |
|         //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]);
 | |
|         
 | |
|     }
 | |
|     //clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]);
 | |
|     //clog(["byte by byte copy", bufferlist.length]);
 | |
|     var to_receive = this.length - this.received_bytes;
 | |
|     clog([bufferlist.readlist.length, bufferlist.offset, bufferlist.length, to_receive]);
 | |
|     for(var i=0 ; i < to_receive; ++i)
 | |
|     {
 | |
|         if (bufferlist.length == 0)
 | |
|             return false;
 | |
|         this.data[this.received_bytes++] = bufferlist.getbyte();
 | |
|     }
 | |
|     this.callback(this.data);
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| ReadFormatRequest.prototype.execute = function(bufferlist)
 | |
| {
 | |
|     while (this.current_arg < this.format.length)
 | |
|     {
 | |
|         var arg = this.format[this.current_arg];
 | |
|         if (bufferlist.length < argument_length[arg])
 | |
|             return false; // need to wait for more data to prcess this argument
 | |
| 
 | |
|         // TODO: measure Buffer.readIntXXX performance and use them if faster
 | |
|         // note: 4 and 2-byte values may cross chunk border & split. need to handle this correctly
 | |
|         // maybe best approach is to wait all data required for format and then process fixed buffer
 | |
|         // TODO: byte order!!!
 | |
|         switch (arg) {
 | |
|         case 'C': 
 | |
|             this.data.push(bufferlist.getbyte());
 | |
|             break;
 | |
|         case 'S':
 | |
|         case 's':
 | |
|             var b1 = bufferlist.getbyte();
 | |
|             var b2 = bufferlist.getbyte();
 | |
|             if (bufferlist.serverBigEndian)
 | |
|                 this.data.push(b2*256+b1);
 | |
|             else
 | |
|                 this.data.push(b1*256+b2);
 | |
|             break;
 | |
|         case 'l':
 | |
|         case 'L':
 | |
|             var b1 = bufferlist.getbyte();
 | |
|             var b2 = bufferlist.getbyte();
 | |
|             var b3 = bufferlist.getbyte();
 | |
|             var b4 = bufferlist.getbyte();
 | |
|             var res;
 | |
|             if (bufferlist.serverBigEndian)
 | |
|                 res = (((b4*256+b3)*256 + b2)*256 + b1);
 | |
|             else
 | |
|                 res = (((b1*256+b2)*256 + b3)*256 + b4);
 | |
|             
 | |
|             if (arg == 'l') {
 | |
|                 var neg = res & 0x80000000; 
 | |
|                 if (!neg) {
 | |
|                     this.data.push(res);
 | |
|                 } else 
 | |
|                     this.data.push((0xffffffff - res + 1) * - 1);
 | |
|             } else
 | |
|                 this.data.push(res);
 | |
|             
 | |
|             break;
 | |
|         case 'x':
 | |
|             bufferlist.getbyte();
 | |
|             break;
 | |
|         }
 | |
|         this.current_arg++;
 | |
|     }
 | |
|     this.callback(this.data);
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| function UnpackStream()
 | |
| {
 | |
|     EventEmitter.call(this);
 | |
| 
 | |
|     this.readlist = [];
 | |
|     this.length = 0;
 | |
|     this.offset = 0;
 | |
|     this.read_queue = [];
 | |
|     this.write_queue = [];
 | |
|     this.write_length = 0;
 | |
| }
 | |
| util.inherits(UnpackStream, EventEmitter);
 | |
| 
 | |
| UnpackStream.prototype.write = function(buf)
 | |
| {
 | |
|     this.readlist.push(buf);
 | |
|     this.length += buf.length;
 | |
|     this.resume();
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.pipe = function(stream)
 | |
| {
 | |
|     // TODO: ondrain & pause
 | |
|     this.on('data', function(data)
 | |
|     {
 | |
|         stream.write(data);
 | |
|     });
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.unpack = function(format, callback)
 | |
| {
 | |
|     this.read_queue.push(new ReadFormatRequest(format, callback));
 | |
|     this.resume();
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.unpackTo = function(destination, names_formats, callback)
 | |
| {
 | |
|     var names = [];
 | |
|     var format = '';
 | |
|     
 | |
|     for (var i=0; i < names_formats.length; ++i)
 | |
|     {
 | |
|         var off = 0;
 | |
|         while(off < names_formats[i].length && names_formats[i][off] == 'x')
 | |
|         {
 | |
|             format += 'x';
 | |
|             off++;
 | |
|         }
 | |
| 
 | |
|         if (off < names_formats[i].length)
 | |
|         {
 | |
|             format += names_formats[i][off];
 | |
|             var name = names_formats[i].substr(off+2);
 | |
|             names.push(name);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     this.unpack(format, function(data) {
 | |
|         if (data.length != names.length)
 | |
|             throw 'Number of arguments mismatch, ' + names.length + ' fields and ' + data.length + ' arguments';
 | |
|         for (var fld = 0; fld < data.length; ++fld)
 | |
|         {
 | |
|             destination[names[fld]] = data[fld];
 | |
|         }
 | |
|         callback(destination);
 | |
|     });
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.get = function(length, callback)
 | |
| {
 | |
|     this.read_queue.push(new ReadFixedRequest(length, callback));
 | |
|     this.resume();
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.resume = function()
 | |
| {
 | |
|     //clog('resume!!!!');
 | |
|     
 | |
|     if (this.resumed)
 | |
|          return;
 | |
|     if (this.read_queue.length == 0)
 | |
|     {
 | |
|         //clog('at resume: no data, skip');
 | |
|         return;
 | |
|     }
 | |
| 
 | |
|     this.resumed = true;
 | |
|     //clog('++++++resumed = ' + this.resumed);
 | |
|     // process all read requests until enough data in the buffer
 | |
|     while(this.read_queue[0].execute(this))
 | |
|     {
 | |
|         //clog('executing read request ...');
 | |
|         //clog(this.read_queue);
 | |
|         this.read_queue.shift();
 | |
|         if (this.read_queue.length == 0)
 | |
|         {
 | |
|            //clog('problem?????'); 
 | |
|            this.resumed = false;
 | |
|            //clog('------resumed = ' + this.resumed);
 | |
|            return;
 | |
|         }
 | |
|     }
 | |
|     this.resumed = false;
 | |
|     //clog('------resumed = ' + this.resumed);
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.getbyte = function()
 | |
| {   
 | |
|     var res = 0;
 | |
|     var b = this.readlist[0];
 | |
|     if (this.offset + 1 < b.length)
 | |
|     {
 | |
|         res = b[this.offset];
 | |
|         this.offset++;
 | |
|         this.length--;
 | |
|    
 | |
|     } else {
 | |
| 
 | |
|         // last byte in current buffer, shift read list
 | |
|         res = b[this.offset];
 | |
|         this.readlist.shift();
 | |
|         this.length--;
 | |
|         this.offset = 0;
 | |
|     }
 | |
|     return res;
 | |
| }
 | |
| 
 | |
| // TODO: measure node 0.5+ buffer serialisers performance
 | |
| UnpackStream.prototype.pack = function(format, args)
 | |
| {
 | |
|     var packetlength = 0;
 | |
|  
 | |
|     var arg = 0;
 | |
|     for (var i = 0; i < format.length; ++i)
 | |
|     {
 | |
|         var f = format[i];
 | |
|         if (f == 'x')
 | |
|         {
 | |
|             packetlength++;
 | |
|         } else if (f == 'a') {
 | |
|             packetlength += args[arg].length;
 | |
|             arg++;
 | |
|         } else {
 | |
|             // this is a fixed-length format, get length from argument_length table
 | |
|             packetlength += argument_length[f];
 | |
|             arg++;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     var buf = new Buffer(packetlength);
 | |
|     var offset = 0;
 | |
|     var arg = 0;
 | |
|     for (var i = 0; i < format.length; ++i)
 | |
|     {
 | |
|         switch(format[i])
 | |
|         {
 | |
|             case 'x': 
 | |
|                 buf[offset++] = 0;
 | |
|                 break;
 | |
|             case 'C':
 | |
|                 var n = args[arg++];
 | |
|                 buf[offset++] = n;
 | |
|                 break;
 | |
|             case 's': // TODO: implement signed INT16!!!
 | |
|             case 'S':
 | |
|                 var n = args[arg++];
 | |
|                 if (this.clientBigEndian)
 | |
|                 {
 | |
|                     buf[offset++] = n & 0xff;
 | |
|                     buf[offset++] = (n >> 8) & 0xff;
 | |
|                 } else {
 | |
|                     buf[offset++] = (n >> 8) & 0xff;
 | |
|                     buf[offset++] = n & 0xff;
 | |
|                 }
 | |
|                 break;
 | |
|             case 'l': // TODO: implement signed INT32!!!
 | |
|             case 'L':
 | |
|                 var n = args[arg++];
 | |
|                 if (this.clientBigEndian)
 | |
|                 {
 | |
|                     buf[offset++] = n & 0xff;
 | |
|                     buf[offset++] = (n >> 8) & 0xff;
 | |
|                     buf[offset++] = (n >> 16) & 0xff;
 | |
|                     buf[offset++] = (n >> 24) & 0xff;
 | |
|                 } else {
 | |
|                     buf[offset++] = (n >> 24) & 0xff;
 | |
|                     buf[offset++] = (n >> 16) & 0xff;
 | |
|                     buf[offset++] = (n >> 8) & 0xff;
 | |
|                     buf[offset++] = n & 0xff;
 | |
|                 }
 | |
|                 break;
 | |
|             case 'a':  // string or buffer
 | |
|                 var str = args[arg++];
 | |
|                 if (Buffer.isBuffer(str))
 | |
|                 {
 | |
|                     str.copy(buf, offset);
 | |
|                     offset += str.length;
 | |
|                 } else {
 | |
|                     // TODO: buffer.write could be faster
 | |
|                     for (var c = 0; c < str.length; ++c)
 | |
|                         buf[offset++] = str.charCodeAt(c);
 | |
|                 }
 | |
|                 break;
 | |
|             case 'p':  // padded string
 | |
|                 var str = args[arg++];
 | |
|                 var len = xutil.padded_length(str.length);
 | |
|                 // TODO: buffer.write could be faster
 | |
|                 var c = 0;
 | |
|                 for (; c < str.length; ++c)
 | |
|                     buf[offset++] = str.charCodeAt(c);
 | |
|                 for (; c < len; ++c)
 | |
|                     buf[offset++] = 0;
 | |
|                 break;
 | |
|         }         
 | |
|     }
 | |
|     this.write_queue.push(buf);
 | |
|     this.write_length += buf.length;
 | |
|     return this;
 | |
| }
 | |
| 
 | |
| UnpackStream.prototype.flush = function(stream)
 | |
| {
 | |
|     // TODO: measure performance benefit of 
 | |
|     // creating and writing one big concatenated buffer
 | |
| 
 | |
|     // TODO: check write result
 | |
|     // pause/resume streaming
 | |
|     for (var i=0; i < this.write_queue.length; ++i)
 | |
|     {
 | |
|          //stream.write(this.write_queue[i])
 | |
|          this.emit('data', this.write_queue[i]);
 | |
|     }
 | |
|     this.write_queue = [];
 | |
|     this.write_length = 0;
 | |
| }
 | |
| 
 | |
| module.exports = UnpackStream;
 |