Source code for usbcore.cpu.unififo_test

#!/usr/bin/env python3

import unittest

from migen import *

from ..endpoint import *
from ..io import FakeIoBuf
from ..test.common import BaseUsbTestCase, CommonUsbTestCase
from ..utils.packet import *

from .unififo import UsbUniFifo
from ..test.clock import CommonTestMultiClockDomain


[docs]class TestUsbUniFifo( BaseUsbTestCase, CommonUsbTestCase, CommonTestMultiClockDomain, unittest.TestCase): maxDiff=None
[docs] def setUp(self): CommonTestMultiClockDomain.setUp(self, ("usb_12", "usb_48")) self.iobuf = FakeIoBuf() self.dut = UsbUniFifo(self.iobuf) self.states = [ "WAIT", "RECV_DATA", "SEND_HAND", "SEND_DATA", "RECV_HAND", ] self.decoding = dict(enumerate(self.states)) self.dut.state = Signal(max=len(self.states)) self.dut.state._enumeration = self.decoding self.packet_h2d = Signal(1) self.packet_d2h = Signal(1) self.packet_idle = Signal(1) class Endpoint: def __init__(self): self._response = EndpointResponse.NAK self.trigger = False self.pending = True self.data = None self.dtb = True def update(self): if self.trigger: self.pending = True @property def response(self): if self._response == EndpointResponse.ACK and (self.pending or self.trigger): return EndpointResponse.NAK else: return self._response @response.setter def response(self, v): assert isinstance(v, EndpointResponse), repr(v) self._response = v def __str__(self): data = self.data if data is None: data = [] return "<Endpoint p:(%s,%s) %s d:%s>" % ( int(self.trigger), int(self.pending), int(self.dtb), len(data)) self.endpoints = { EndpointType.epaddr(0, EndpointType.OUT): Endpoint(), EndpointType.epaddr(0, EndpointType.IN): Endpoint(), EndpointType.epaddr(1, EndpointType.OUT): Endpoint(), EndpointType.epaddr(1, EndpointType.IN): Endpoint(), EndpointType.epaddr(2, EndpointType.OUT): Endpoint(), EndpointType.epaddr(2, EndpointType.IN): Endpoint(), } for epaddr in self.endpoints: self.endpoints[epaddr].addr = epaddr
[docs] def run_sim(self, stim): def padfront(): yield from self.next_state("WAIT") yield yield yield yield yield from self.dut.pullup._out.write(1) yield yield yield yield yield from self.idle() yield from stim() print() print("-"*10) CommonUsbTestCase.patch_csrs(self) clocks={ "sys": 12, "usb_48": 48, "usb_12": 192, } run_simulation( self.dut, padfront(), vcd_name=self.make_vcd_name(), clocks=clocks, ) print("-"*10)
[docs] def recv_packet(self): rx = (yield from self.dut.ev.pending.read()) & 0b1 if not rx: return yield actual_data = [] while range(0, 256): empty = yield from self.dut.obuf_empty.read() if empty: break v = yield from self.dut.obuf_head.read() yield from self.dut.obuf_head.write(0) actual_data.append(v) for i in range(192): yield yield from self.dut.ev.pending.write(0b1) for i in range(192): yield return actual_data
[docs] def send_packet(self, pid, data=None): yield from self.dut.arm.write(0) armed = yield from self.dut.arm.read() self.assertFalse(armed) empty = yield from self.dut.obuf_empty.read() self.assertTrue(empty) # sync, pid pkt_data = [0b10000000, pid | ((0b1111 ^ pid) << 4)] if data is None: assert pid in (PID.ACK, PID.NAK, PID.STALL), (pid, data) else: assert pid in (PID.DATA0, PID.DATA1), pid pkt_data += data pkt_data += crc16(data) print("send_packet", pid, data) print("send_packet", pkt_data) for d in pkt_data: yield from self.dut.ibuf_head.write(d) for i in range(192): yield for i in range(192): yield empty = yield from self.dut.ibuf_empty.read() self.assertFalse(empty) yield from self.dut.arm.write(1)
[docs] def tick_sys(self): yield from self.update_internal_signals() yield
[docs] def tick_usb48(self): yield from self.wait_for_edge("usb_48")
[docs] def tick_usb12(self): for i in range(0, 4): yield from self.tick_usb48()
[docs] def next_state(self, state): self.assertIn(state, self.states) yield self.dut.state.eq(self.states.index(state)) self.state = state
[docs] def on_usb_48_edge(self): if False: yield
[docs] def on_usb_12_edge(self): if False: yield
[docs] def update_internal_signals(self): yield from self.update_clocks() def decode_pid(pkt_data): pkt_data = encode_data(pkt_data[:1]) pidt = int(pkt_data[0:4][::-1], 2) pidb = int(pkt_data[4:8][::-1], 2) self.assertEqual(pidt ^ 0b1111, pidb) return PID(pidt) for ep in self.endpoints.values(): if ep.trigger: ep.pending = True ep.trigger = False del ep if self.state == "WAIT": self.ep = None self.handshake = None pkt_data = yield from self.recv_packet() if not pkt_data: return self.assertEqual(len(pkt_data), 3, pkt_data) pid = decode_pid(pkt_data) pkt_data = encode_data(pkt_data) addr = int(pkt_data[8:8+7][::-1], 2) endp = int(pkt_data[8+7:8+7+4][::-1], 2) crc5 = int(pkt_data[8+7+4:][::-1], 2) print("WAIT pid:", pid, "addr:", addr, "ep:", endp, "crc5:", crc5) self.assertEqual(crc5, crc5_token(addr, endp)) if pid == PID.SETUP or pid == PID.OUT: self.ep = self.endpoints[EndpointType.epaddr(endp, EndpointType.OUT)] if pid == PID.SETUP: self.handshake = EndpointResponse.ACK self.ep.response = EndpointResponse.NAK self.ep.dtb = False iep = self.endpoints[EndpointType.epaddr(endp, EndpointType.IN)] self.assertIsNot(self.ep, iep) iep.response = EndpointResponse.NAK iep.dtb = True print(self.ep, iep) else: self.handshake = self.ep.response yield from self.next_state("RECV_DATA") elif pid == PID.IN: self.ep = self.endpoints[EndpointType.epaddr(endp, EndpointType.IN)] self.handshake = self.ep.response if self.ep.response == EndpointResponse.ACK: #self.assertIsNotNone(self.ep.data) if self.ep.data is None: self.ep.data = [] yield from self.next_state("SEND_DATA") else: yield from self.next_state("SEND_HAND") elif pid == PID.SOF: # ignore SOF packets self.clear_pending(addr) else: assert False, pid elif self.state == "RECV_DATA": self.assertIsNotNone(self.ep) pkt_data = yield from self.recv_packet() if not pkt_data: return self.ep_print(self.ep.addr, "RECV_DATA: %r", [hex(b) for b in pkt_data]) pid = decode_pid(pkt_data) self.ep_print(self.ep.addr, "RECV_DATA pid:%s data:%r", pid, pkt_data) if pid == PID.SOF: # ignore SOF packets addr = self.ep.addr #addr = int(pkt_data[8:8+7][::-1], 2) self.clear_pending(addr) else: if self.handshake == EndpointResponse.ACK: self.assertIsNone(self.ep.data) self.assertIn(encode_pid(pid), (encode_pid(PID.DATA0), encode_pid(PID.DATA1))) self.assertSequenceEqual(pkt_data[-2:], crc16(pkt_data[1:-2])) self.ep.data = pkt_data[1:-2] yield from self.next_state("SEND_HAND") elif self.state == "SEND_HAND": self.assertIsNotNone(self.ep) self.assertIsNotNone(self.handshake) pid = { EndpointResponse.STALL: PID.STALL, EndpointResponse.NAK: PID.NAK, EndpointResponse.ACK: PID.ACK, }[self.handshake] self.ep_print(self.ep.addr, "SEND_HAND pid:%s", pid) yield from self.send_packet(pid) if self.handshake == EndpointResponse.ACK: self.ep.trigger = True self.ep.dtb = not self.ep.dtb yield from self.next_state("WAIT") elif self.state == "SEND_DATA": self.assertIsNotNone(self.ep) self.assertIsNotNone(self.ep.data) pid = [PID.DATA0, PID.DATA1][self.ep.dtb] self.ep_print(self.ep.addr, "SEND_DATA pid:%s data:%r", pid, self.ep.data) yield from self.send_packet(pid, self.ep.data) self.ep.data = None yield from self.next_state("RECV_HAND") elif self.state == "RECV_HAND": self.assertIsNotNone(self.ep) pkt_data = yield from self.recv_packet() if not pkt_data: return pid = decode_pid(pkt_data) self.ep_print(self.ep.addr, "RECV_HAND pid:%s", pid) if pid != PID.ACK: raise SystemError(pkt_data) self.ep.trigger = True self.ep.dtb = not self.ep.dtb yield from self.next_state("WAIT")
###################################################################### ## Helpers ###################################################################### # IRQ / packet pending -----------------
[docs] def trigger(self, epaddr): yield from self.update_internal_signals() return self.endpoints[epaddr].trigger
[docs] def pending(self, epaddr): yield from self.update_internal_signals() return self.endpoints[epaddr].pending or self.endpoints[epaddr].trigger
[docs] def clear_pending(self, epaddr): # Can't clear pending while trigger is active. for i in range(0, 192): trigger = (yield from self.trigger(epaddr)) if not trigger: break yield self.assertFalse(trigger) # Check the pending flag is raised self.assertTrue((yield from self.pending(epaddr))) # Clear pending flag self.endpoints[epaddr].pending = False self.ep_print(epaddr, "clear_pending") # Check the pending flag has been cleared self.assertFalse((yield from self.trigger(epaddr))) self.assertFalse((yield from self.pending(epaddr)))
# Endpoint state -----------------------
[docs] def response(self, epaddr): if False: yield return self.endpoints[epaddr].response
[docs] def set_response(self, epaddr, v): assert isinstance(v, EndpointResponse), v if False: yield self.ep_print(epaddr, "set_response: %s", v) self.endpoints[epaddr].response = v
# Get/set endpoint data ----------------
[docs] def set_data(self, epaddr, data): """Set an endpoints buffer to given data to be sent.""" assert isinstance(data, (list, tuple)) if False: yield self.ep_print(epaddr, "Set: %r", data) self.endpoints[epaddr].data = data
[docs] def expect_data(self, epaddr, data): """Expect that an endpoints buffer has given contents.""" # Make sure there is something pending self.assertTrue((yield from self.pending(epaddr))) self.ep_print(epaddr, "expect_data: %s", data) actual_data = self.endpoints[epaddr].data assert actual_data is not None self.endpoints[epaddr].data = None self.ep_print(epaddr, "Got: %r (expected: %r)", actual_data, data) self.assertSequenceEqual(data, actual_data)
[docs] def dtb(self, epaddr): if False: yield print("dtb", epaddr, self.endpoints[epaddr]) return self.endpoints[epaddr].dtb
if __name__ == "__main__": import unittest unittest.main()