Source code for usbcore.cpu.epmem_test

#!/usr/bin/env python3

import unittest
from unittest import TestCase

from migen import *

from ..test.common import BaseUsbTestCase, CommonUsbTestCase
from ..io_test import FakeIoBuf

from .epmem import MemInterface
from ..endpoint import EndpointType, EndpointResponse
from ..test.clock import CommonTestMultiClockDomain
from ..utils.bits import get_bit, set_bit

[docs]class TestMemInterface( BaseUsbTestCase, CommonUsbTestCase, CommonTestMultiClockDomain, unittest.TestCase): maxDiff=None
[docs] def on_usb_48_edge(self): if False: yield
[docs] def on_usb_12_edge(self): if False: yield
[docs] def setUp(self): CommonTestMultiClockDomain.setUp(self, ("usb_12", "usb_48")) self.iobuf = FakeIoBuf() self.dut = MemInterface(self.iobuf, num_endpoints=3) self.packet_h2d = Signal(1) self.packet_d2h = Signal(1) self.packet_idle = Signal(1)
[docs] def run_sim(self, stim): def padfront(): yield yield yield yield yield from self.dut.pullup._out.write(1) yield yield yield yield yield from self.idle() yield from stim() # print() # print("-"*10) CommonUsbTestCase.patch_csrs(self) run_simulation( self.dut, padfront(), vcd_name=self.make_vcd_name(), clocks={ "sys": 2, "usb_48": 8, "usb_12": 32, }, )
# print("-"*10)
[docs] def tick_sys(self): yield from self.update_internal_signals() yield
[docs] def tick_usb48(self): yield from self.wait_for_edge("usb_48")
[docs] def tick_usb12(self): for i in range(0, 4): yield from self.tick_usb48()
[docs] def update_internal_signals(self): yield from self.update_clocks()
###################################################################### ## Helpers ######################################################################
[docs] def get_module(self, epaddr, name, obj=None): if obj is None: obj = self.dut epdir = EndpointType.epdir(epaddr) if epdir == EndpointType.OUT: module = getattr(obj, 'o{}'.format(name)) elif epdir == EndpointType.IN: module = getattr(obj, 'i{}'.format(name)) else: raise SystemError("Unknown endpoint type: %r" % epdir) return module
[docs] def get_evsrc(self, epaddr): epnum = EndpointType.epnum(epaddr) epdir = EndpointType.epdir(epaddr) # if epdir == EndpointType.OUT: # print("get_evsrc name: oep{}".format(epnum)) # elif epdir == EndpointType.IN: # print("get_evsrc name: iep{}".format(epnum)) return self.get_module(epaddr, "ep{}".format(epnum), obj=self.dut.ev)
[docs] def get_ptr_csr(self, epaddr): epnum = EndpointType.epnum(epaddr) return self.get_module(epaddr, "ptr_ep{}".format(epnum))
[docs] def get_len_csr(self, epaddr): epnum = EndpointType.epnum(epaddr) return self.get_module(epaddr, "len_ep{}".format(epnum))
[docs] def set_csr(self, csr, epaddr, v): c = yield from csr.read() v = set_bit(c, epaddr, v) yield from csr.write(v)
# Data Toggle Bit
[docs] def dtb(self, epaddr): v = yield from self.dut.dtb.read() return get_bit(epaddr, v)
[docs] def set_dtb(self, epaddr): yield from self.set_csr(self.dut.dtb, epaddr, 1)
[docs] def clear_dtb(self, epaddr): yield from self.set_csr(self.dut.dtb, epaddr, 0)
# Arm endpoint Bit
[docs] def arm(self, epaddr): v = yield from self.dut.arm.read() return get_bit(epaddr, v)
[docs] def set_arm(self, epaddr): yield from self.set_csr(self.dut.arm, epaddr, 1)
[docs] def clear_arm(self, epaddr): yield from self.set_csr(self.dut.arm, epaddr, 0)
# Stall endpoint Bit
[docs] def sta(self, epaddr): v = yield from self.dut.sta.read() return get_bit(epaddr, v)
[docs] def set_sta(self, epaddr): yield from self.set_csr(self.dut.sta, epaddr, 1)
[docs] def clear_sta(self, epaddr): yield from self.set_csr(self.dut.sta, epaddr, 0)
# IRQ / packet pending -----------------
[docs] def trigger(self, epaddr): evsrc = self.get_evsrc(epaddr) v = yield evsrc.trigger return v
[docs] def pending(self, epaddr): # print("epaddr: {}".format(epaddr)) evsrc = self.get_evsrc(epaddr) v = yield evsrc.pending return v
[docs] def clear_pending(self, epaddr): # Can't clear pending while trigger is active. while True: trigger = (yield from self.trigger(epaddr)) if not trigger: break yield # Check the pending flag is raised self.assertTrue((yield from self.pending(epaddr))) # Clear pending flag mask = 1 << epaddr yield from self.dut.ev.pending.write(mask) yield # Check the pending flag has been cleared self.assertFalse((yield from self.trigger(epaddr))) self.assertFalse((yield from self.pending(epaddr)))
# Endpoint state -----------------------
[docs] def response(self, epaddr): if (yield from self.sta(epaddr)): return EndpointResponse.STALL pending = yield from self.pending(epaddr) armed = yield from self.arm(epaddr) if armed and not pending: return EndpointResponse.ACK return EndpointResponse.NAK
[docs] def format_epaddr(self, ep): epdir = "IN" if ep & 1 == 0: epdir = "OUT" return "{} - EP{} ({})".format(ep, ep >> 1, epdir)
[docs] def set_response(self, epaddr, v): assert isinstance(v, EndpointResponse), v if v == EndpointResponse.STALL: # print("Setting response to {} to STALL".format(self.format_epaddr(epaddr))) yield from self.set_sta(epaddr) yield from self.clear_arm(epaddr) elif v == EndpointResponse.ACK: # print("Setting response to {} to ACK".format(self.format_epaddr(epaddr))) yield from self.clear_sta(epaddr) yield from self.set_arm(epaddr) elif v == EndpointResponse.NAK: # print("Setting response to {} to NAK".format(self.format_epaddr(epaddr))) yield from self.clear_sta(epaddr) yield from self.clear_arm(epaddr) else: print("Unknown EP response to {}: {}".format(self.format_epaddr(epaddr), v))
# Get/set endpoint data ----------------
[docs] def set_data(self, epaddr, data): """Set an endpoints buffer to given data to be sent.""" assert isinstance(data, (list, tuple)) # self.ep_print(epaddr, "Set: %r", data) ep_ptr = yield from self.get_ptr_csr(epaddr).read() buf = self.get_module(epaddr, "buf") # # Make sure the endpoint is empty # empty = yield from endpoint.ibuf_empty.read() # self.assertTrue( # empty, "Device->Host buffer not empty when setting data!") for i, v in enumerate(data): yield buf[ep_ptr+i].eq(v) ep_len = self.get_len_csr(epaddr) yield from ep_len.write(ep_ptr + len(data)) yield
[docs] def expect_data(self, epaddr, data): """Expect that an endpoints buffer has given contents.""" ep_ptr = yield from self.get_ptr_csr(epaddr).read() buf = self.get_module(epaddr, "buf") # actual_data = [] # while range(0, 1024): # yield from endpoint.obuf_head.write(0) # empty = yield from endpoint.obuf_empty.read() # if empty: # break # v = yield from endpoint.obuf_head.read() # actual_data.append(v) # yield # Make sure there is something pending self.assertTrue((yield from self.pending(epaddr))) actual_data = [] for i in range(len(data), 0, -1): # Subtract two bytes, since the CRC16 is stripped from the buffer. d = yield buf[ep_ptr-i-2] actual_data.append(d) msg = "\n" loop=0 msg = msg + "Wanted: [" for var in data: if loop > 0: msg = msg + ", " msg = msg + "0x{:02x}".format(var) loop = loop + 1 msg = msg + "]\n" loop=0 msg = msg + " Got: [" for var in actual_data: if loop > 0: msg = msg + ", " msg = msg + "0x{:02x}".format(var) loop = loop + 1 msg = msg + "]" # self.ep_print(epaddr, msg) self.assertSequenceEqual(data, actual_data, msg)
if __name__ == '__main__': unittest.main()