Source code for usbcore.cpu.epfifo_test

#!/usr/bin/env python3

import unittest
from unittest import TestCase

from migen import *

from ..endpoint import EndpointType, EndpointResponse
from ..io_test import FakeIoBuf
from ..pid import PIDTypes
from ..utils.packet import crc16

from ..test.common import BaseUsbTestCase, CommonUsbTestCase
from ..test.clock import CommonTestMultiClockDomain

from .epfifo import PerEndpointFifoInterface


[docs]class TestPerEndpointFifoInterface( BaseUsbTestCase, CommonUsbTestCase, CommonTestMultiClockDomain, unittest.TestCase): maxDiff=None
[docs] def get_endpoint(self, epaddr): epdir = EndpointType.epdir(epaddr) epnum = EndpointType.epnum(epaddr) if epdir == EndpointType.OUT: return getattr(self.dut, "ep_%s_out" % epnum) elif epdir == EndpointType.IN: return getattr(self.dut, "ep_%s_in" % epnum) else: raise SystemError("Unknown endpoint type: %r" % epdir)
[docs] def on_usb_48_edge(self): if False: yield
[docs] def on_usb_12_edge(self): if False: yield
[docs] def setUp(self): CommonTestMultiClockDomain.setUp(self, ("usb_12", "usb_48")) # Only enable "debug" mode for tests with "debug" in their name if "debug" in self._testMethodName: debug = True else: debug = False self.iobuf = FakeIoBuf() self.endpoints = [EndpointType.BIDIR, EndpointType.IN, EndpointType.BIDIR] self.dut = PerEndpointFifoInterface(self.iobuf, self.endpoints, debug=debug) self.packet_h2d = Signal(1) self.packet_d2h = Signal(1) self.packet_idle = Signal(1)
[docs] def run_sim(self, stim): def padfront(): yield yield yield yield yield yield # Make sure that the endpoints are currently blocked opending = yield self.dut.ep_0_out.ev.packet.pending self.assertTrue(opending) ipending = yield self.dut.ep_0_in.ev.packet.pending self.assertTrue(ipending) yield yield from self.dut.pullup._out.write(1) yield # Make sure that the endpoints are currently blocked but not being # triggered. opending = yield self.dut.ep_0_out.ev.packet.pending self.assertTrue(opending) ipending = yield self.dut.ep_0_in.ev.packet.pending self.assertTrue(ipending) otrigger = yield self.dut.ep_0_out.ev.packet.trigger self.assertFalse(otrigger) itrigger = yield self.dut.ep_0_in.ev.packet.trigger self.assertFalse(itrigger) yield yield from self.idle() yield from stim() CommonUsbTestCase.patch_csrs(self) run_simulation( self.dut, padfront(), vcd_name=self.make_vcd_name(), #clocks={ # "sys": 12, # "usb_48": 48, # "usb_12": 192, #}, clocks={ "sys": 2, "usb_48": 8, "usb_12": 32, }, ) print("-"*10)
[docs] def tick_sys(self): yield from self.update_internal_signals() yield
[docs] def tick_usb48(self): yield from self.wait_for_edge("usb_48")
[docs] def tick_usb12(self): for i in range(0, 4): yield from self.tick_usb48()
[docs] def update_internal_signals(self): yield from self.update_clocks()
# IRQ / packet pending -----------------
[docs] def trigger(self, epaddr): endpoint = self.get_endpoint(epaddr) status = yield endpoint.ev.packet.trigger return bool(status)
[docs] def pending(self, epaddr): endpoint = self.get_endpoint(epaddr) status = yield from endpoint.ev.pending.read() return bool(status & 0x2)
[docs] def clear_pending(self, epaddr): # Can't clear pending while trigger is active. for i in range(0, 100): trigger = (yield from self.trigger(epaddr)) if not trigger: break yield from self.tick_sys() self.assertFalse(trigger) # Check the pending flag is raised self.assertTrue((yield from self.pending(epaddr))) # Clear pending flag endpoint = self.get_endpoint(epaddr) yield from endpoint.ev.pending.write(0xf) yield from self.tick_sys() # Check the pending flag has been cleared self.assertFalse((yield from self.trigger(epaddr))) self.assertFalse((yield from self.pending(epaddr)))
# Endpoint state -----------------------
[docs] def response(self, epaddr): endpoint = self.get_endpoint(epaddr) response = yield endpoint.response return response
[docs] def set_response(self, epaddr, v): endpoint = self.get_endpoint(epaddr) assert isinstance(v, EndpointResponse), v yield from endpoint.respond.write(v)
[docs] def expect_last_tok(self, epaddr, value): endpoint = self.get_endpoint(epaddr) last_tok = yield from endpoint.last_tok.read() self.assertEqual(last_tok, value)
# Get/set endpoint data ----------------
[docs] def set_data(self, epaddr, data): """Set an endpoints buffer to given data to be sent.""" assert isinstance(data, (list, tuple)) self.ep_print(epaddr, "Set: %r", data) endpoint = self.get_endpoint(epaddr) # Make sure the endpoint is empty empty = yield from endpoint.ibuf_empty.read() self.assertTrue( empty, "Device->Host buffer not empty when setting data!") # If we are writing multiple bytes of data, need to make sure we are # not going to ACK the packet until the data is ready. if len(data) > 1: response = yield endpoint.response self.assertNotEqual(response, EndpointResponse.ACK) for v in data: yield from endpoint.ibuf_head.write(v) yield from self.tick_sys() for i in range(0, 10): yield from self.tick_usb12() empty = yield from endpoint.ibuf_empty.read() if len(data) > 0: self.assertFalse( bool(empty), "Buffer not empty after setting zero data!") else: self.assertTrue( bool(empty), "Buffer empty after setting data!")
[docs] def expect_data(self, epaddr, data): """Expect that an endpoints buffer has given contents.""" endpoint = self.get_endpoint(epaddr) # Make sure there is something pending self.assertTrue((yield from self.pending(epaddr))) actual_data = [] while range(0, 1024): yield from endpoint.obuf_head.write(0) empty = yield from endpoint.obuf_empty.read() if empty: break v = yield from endpoint.obuf_head.read() actual_data.append(v) yield assert len(actual_data) >= 2, actual_data actual_data, actual_crc16 = actual_data[:-2], actual_data[-2:] self.ep_print(epaddr, "Got: %r (expected: %r)", actual_data, data) self.assertSequenceEqual(data, actual_data) self.assertSequenceEqual(crc16(data), actual_crc16)
[docs] def dtb(self, epaddr): endpoint = self.get_endpoint(epaddr) status = yield from endpoint.dtb.read() return bool(status)
if __name__ == '__main__': unittest.main()