#!/usr/bin/env python3
from .packet import *
from ..pid import *
[docs]def pp_packet(p, cycles=4):
"""
>>> print(pp_packet(wrap_packet(handshake_packet(PID.ACK), cycles=1), cycles=1))
-
K 1 Sync
J 2 Sync
K 3 Sync
J 4 Sync
K 5 Sync
J 6 Sync
K 7 Sync
K 8 Sync
-
J 1 PID (PID.ACK)
J 2 PID
K 3 PID
J 4 PID
J 5 PID
K 6 PID
K 7 PID
K 8 PID
-
_ SE0
_ SE0
J END
>>> print(pp_packet(wrap_packet(handshake_packet(PID.ACK))))
----
KKKK 1 Sync
JJJJ 2 Sync
KKKK 3 Sync
JJJJ 4 Sync
KKKK 5 Sync
JJJJ 6 Sync
KKKK 7 Sync
KKKK 8 Sync
----
JJJJ 1 PID (PID.ACK)
JJJJ 2 PID
KKKK 3 PID
JJJJ 4 PID
JJJJ 5 PID
KKKK 6 PID
KKKK 7 PID
KKKK 8 PID
----
____ SE0
____ SE0
JJJJ END
>>> print(pp_packet(wrap_packet(handshake_packet(PID.ACK), cycles=10), cycles=10))
----------
KKKKKKKKKK 1 Sync
JJJJJJJJJJ 2 Sync
KKKKKKKKKK 3 Sync
JJJJJJJJJJ 4 Sync
KKKKKKKKKK 5 Sync
JJJJJJJJJJ 6 Sync
KKKKKKKKKK 7 Sync
KKKKKKKKKK 8 Sync
----------
JJJJJJJJJJ 1 PID (PID.ACK)
JJJJJJJJJJ 2 PID
KKKKKKKKKK 3 PID
JJJJJJJJJJ 4 PID
JJJJJJJJJJ 5 PID
KKKKKKKKKK 6 PID
KKKKKKKKKK 7 PID
KKKKKKKKKK 8 PID
----------
__________ SE0
__________ SE0
JJJJJJJJJJ END
>>> print(pp_packet(wrap_packet(token_packet(PID.SETUP, 0, 0))))
----
KKKK 1 Sync
JJJJ 2 Sync
KKKK 3 Sync
JJJJ 4 Sync
KKKK 5 Sync
JJJJ 6 Sync
KKKK 7 Sync
KKKK 8 Sync
----
KKKK 1 PID (PID.SETUP)
JJJJ 2 PID
JJJJ 3 PID
JJJJ 4 PID
KKKK 5 PID
KKKK 6 PID
JJJJ 7 PID
KKKK 8 PID
----
JJJJ 1 Address
KKKK 2 Address
JJJJ 3 Address
KKKK 4 Address
JJJJ 5 Address
KKKK 6 Address
JJJJ 7 Address
KKKK 1 Endpoint
----
JJJJ 2 Endpoint
KKKK 3 Endpoint
JJJJ 4 Endpoint
KKKK 1 CRC5
KKKK 2 CRC5
JJJJ 3 CRC5
KKKK 4 CRC5
JJJJ 5 CRC5
----
____ SE0
____ SE0
JJJJ END
>>> print(pp_packet(wrap_packet(data_packet(PID.DATA0, [5, 6]))))
----
KKKK 1 Sync
JJJJ 2 Sync
KKKK 3 Sync
JJJJ 4 Sync
KKKK 5 Sync
JJJJ 6 Sync
KKKK 7 Sync
KKKK 8 Sync
----
KKKK 1 PID (PID.DATA0)
KKKK 2 PID
JJJJ 3 PID
KKKK 4 PID
JJJJ 5 PID
KKKK 6 PID
KKKK 7 PID
KKKK 8 PID
----
KKKK
JJJJ
JJJJ
KKKK
JJJJ
KKKK
JJJJ
KKKK
----
JJJJ
JJJJ
JJJJ
KKKK
JJJJ
KKKK
JJJJ
KKKK
----
KKKK 1 CRC16
JJJJ 2 CRC16
JJJJ 3 CRC16
JJJJ 4 CRC16
JJJJ 5 CRC16
JJJJ 6 CRC16
JJJJ 7 CRC16
KKKK 8 CRC16
----
KKKK 9 CRC16
JJJJ 10 CRC16
JJJJ 11 CRC16
JJJJ 12 CRC16
JJJJ 13 CRC16
KKKK 14 CRC16
JJJJ 15 CRC16
KKKK 16 CRC16
----
____ SE0
____ SE0
JJJJ END
>>> # Requires bit stuffing!
>>> print(pp_packet(wrap_packet(data_packet(PID.DATA0, [0x1]))))
----
KKKK 1 Sync
JJJJ 2 Sync
KKKK 3 Sync
JJJJ 4 Sync
KKKK 5 Sync
JJJJ 6 Sync
KKKK 7 Sync
KKKK 8 Sync
----
KKKK 1 PID (PID.DATA0)
KKKK 2 PID
JJJJ 3 PID
KKKK 4 PID
JJJJ 5 PID
KKKK 6 PID
KKKK 7 PID
KKKK 8 PID
----
KKKK
JJJJ
KKKK
JJJJ
KKKK
JJJJ
KKKK
JJJJ
----
JJJJ 1 CRC16
KKKK 2 CRC16
JJJJ 3 CRC16
KKKK 4 CRC16
JJJJ 5 CRC16
KKKK 6 CRC16
JJJJ 7 CRC16
JJJJ 8 CRC16
----
JJJJ 9 CRC16
JJJJ 10 CRC16
JJJJ 11 CRC16
JJJJ 12 CRC16
JJJJ 13 CRC16
KKKK Bitstuff
KKKK 14 CRC16
KKKK 15 CRC16
JJJJ 16 CRC16
----
____ SE0
____ SE0
JJJJ END
>>> print(pp_packet(wrap_packet(data_packet(PID.DATA0, [0x1]))[:96]))
----
KKKK 1 Sync
JJJJ 2 Sync
KKKK 3 Sync
JJJJ 4 Sync
KKKK 5 Sync
JJJJ 6 Sync
KKKK 7 Sync
KKKK 8 Sync
----
KKKK 1 PID (PID.DATA0)
KKKK 2 PID
JJJJ 3 PID
KKKK 4 PID
JJJJ 5 PID
KKKK 6 PID
KKKK 7 PID
KKKK 8 PID
----
KKKK
JJJJ
KKKK
JJJJ
KKKK
JJJJ
KKKK
JJJJ END
>>> print(pp_packet(wrap_packet(sof_packet(12))))
----
KKKK 1 Sync
JJJJ 2 Sync
KKKK 3 Sync
JJJJ 4 Sync
KKKK 5 Sync
JJJJ 6 Sync
KKKK 7 Sync
KKKK 8 Sync
----
KKKK 1 PID (PID.SOF)
JJJJ 2 PID
JJJJ 3 PID
KKKK 4 PID
JJJJ 5 PID
JJJJ 6 PID
KKKK 7 PID
KKKK 8 PID
----
KKKK 1 Frame #
JJJJ 2 Frame #
KKKK 3 Frame #
JJJJ 4 Frame #
KKKK 5 Frame #
JJJJ 6 Frame #
KKKK 7 Frame #
JJJJ 8 Frame #
----
JJJJ 9 Frame #
JJJJ 10 Frame #
KKKK 11 Frame #
KKKK 1 CRC5
JJJJ 2 CRC5
KKKK 3 CRC5
JJJJ 4 CRC5
JJJJ 5 CRC5
----
____ SE0
____ SE0
JJJJ END
"""
output = []
chunks = [p[i:i+cycles] for i in range(0, len(p), cycles)]
class BitStuff:
def __init__(self):
self.i = 0
def __call__(self, chunk):
if self.i == 7:
self.i = 0
if chunk[0] == 'K':
output.extend([chunk, ' Bitstuff\n'])
else:
output.extend([chunk, ' Bitstuff ERROR!\n'])
return True
if chunk[0] == 'J':
self.i += 1
else:
self.i = 0
return False
class Seperator:
def __init__(self):
self.i = 0
def __call__(self, chunk):
if self.i % 8 == 0:
output.append('-'*cycles+'\n')
self.i += 1
return False
class Sync:
def __init__(self):
self.i = 0
def __call__(self, chunk):
if self.i > 7:
return False
self.i += 1
output.extend([chunk, ' %i Sync\n' % self.i])
return True
class Pid:
def __init__(self):
self.done = False
self.pid_chunks = []
self.type = None
self.encoded_pids = {}
for p in PID:
self.encoded_pids[p.encode(cycles)] = p
def __call__(self, chunk):
if self.done:
return False
self.pid_chunks.append(chunk)
if len(self.pid_chunks) < 8:
return True
self.done = True
self.type = self.encoded_pids.get("".join(self.pid_chunks), 'ERROR')
for i, chunk in enumerate(self.pid_chunks):
if i == 0:
output.extend([chunk, ' %i PID (%s)\n' % (1, self.type)])
else:
output.extend([chunk, ' %i PID\n' % (i+1,)])
return True
class SOF:
def __init__(self, pid):
self.pid = pid
self.i = 0
self.state = 'FRAME NUMBER'
def __call__(self, chunk):
if self.pid.type != PID.SOF:
return False
self.i += 1
if self.state == 'FRAME NUMBER':
output.extend([chunk, ' %2i Frame #\n' % self.i])
if self.i == 11:
self.state = 'CRC5'
self.i = 0
elif self.state == 'CRC5':
output.extend([chunk, ' %i CRC5\n' % self.i])
if self.i == 5:
self.state = "DATA"
self.i = 0
else:
output.extend([chunk, ' ERROR!\n'])
return True
def finish(self):
pass
class Data:
def __init__(self, pid):
self.done = False
self.pid = pid
self.last16 = []
def __call__(self, chunk):
if self.pid.type not in (PID.DATA0, PID.DATA1):
return False
self.last16.append(chunk)
output.append(None)
if len(self.last16) > 16:
self.patch(self.last16.pop(0)+'\n')
return True
def patch(self, s):
assert isinstance(s, str), s
p = output.index(None)
output[p] = s
def finish(self):
if output.count(None) == 0:
return False
assert output.count(None) == len(self.last16), (output.count(None), len(self.last16))
if len(self.last16) == 16:
for i, chunk in enumerate(self.last16):
self.patch(chunk+' %2i CRC16\n' % (i+1,))
else:
for i, chunk in enumerate(self.last16):
self.patch(chunk+'\n')
assert output.count(None) == 0
class Token:
def __init__(self, pid):
self.pid = pid
self.i = 0
self.state = 'ADDRESS'
def __call__(self, chunk):
if self.pid.type in (PID.DATA0, PID.DATA1):
return False
self.i += 1
if self.state == 'ADDRESS':
output.extend([chunk, ' %i Address\n' % self.i])
if self.i == 7:
self.state = "ENDPOINT"
self.i = 0
elif self.state == 'ENDPOINT':
output.extend([chunk, ' %i Endpoint\n' % self.i])
if self.i == 4:
self.state = "CRC5"
self.i = 0
elif self.state == 'CRC5':
output.extend([chunk, ' %i CRC5\n' % self.i])
if self.i == 5:
self.state = "DATA"
self.i = 0
elif self.state == 'DATA':
output.extend([chunk, ' ERROR\n'])
return True
class End:
def __init__(self):
pass
def __call__(self, chunk):
if chunk == '_' * cycles:
output.extend([chunk, ' SE0\n'])
return True
if len(chunks) == 0:
output.extend([chunk, ' END\n'])
return True
return False
printers = []
printers.append(BitStuff())
printers.append(Seperator())
printers.append(Sync())
pid_printer = Pid()
printers.append(pid_printer)
printers.append(End())
printers.append(SOF(pid_printer))
printers.append(Data(pid_printer))
printers.append(Token(pid_printer))
while len(chunks) > 0:
chunk = chunks.pop(0)
for printer in printers:
if printer(chunk):
break
else:
output.extend([chunk, ' ERROR!\n'])
for p in printers:
if not hasattr(p, "finish"):
continue
p.finish()
assert output.count(None) == 0, output
output[-1] = output[-1][:-1]
return "".join(output)
if __name__ == "__main__":
import doctest
doctest.testmod()