Source code for usbcore.cpu.eptri_test

#!/usr/bin/env python3

import unittest
from unittest import TestCase

from migen import *

from ..endpoint import EndpointType, EndpointResponse
from ..io_test import FakeIoBuf
from ..pid import PIDTypes
from ..utils.packet import crc16

from ..test.common import BaseUsbTestCase, CommonUsbTestCase
from ..test.clock import CommonTestMultiClockDomain

from .eptri import TriEndpointInterface


[docs]class TestTriEndpointInterface( BaseUsbTestCase, CommonUsbTestCase, CommonTestMultiClockDomain, unittest.TestCase): maxDiff=None # def get_endpoint(self, epaddr): # epdir = EndpointType.epdir(epaddr) # epnum = EndpointType.epnum(epaddr) # if epdir == EndpointType.OUT: # return getattr(self.dut, "ep_%s_out" % epnum) # elif epdir == EndpointType.IN: # return getattr(self.dut, "ep_%s_in" % epnum) # else: # raise SystemError("Unknown endpoint type: %r" % epdir)
[docs] def on_usb_48_edge(self): if False: yield
[docs] def on_usb_12_edge(self): if False: yield
[docs] def setUp(self): CommonTestMultiClockDomain.setUp(self, ("usb_12", "usb_48")) # Only enable "debug" mode for tests with "debug" in their name if "debug" in self._testMethodName: debug = True else: debug = False self.iobuf = FakeIoBuf() self.dut = TriEndpointInterface(self.iobuf, debug=debug) self.packet_h2d = Signal(1) self.packet_d2h = Signal(1) self.packet_idle = Signal(1)
[docs] def run_sim(self, stim): def padfront(): yield yield yield yield yield yield # # Make sure that the endpoints are currently blocked # opending = yield self.dut.epout.ev.packet.pending # self.assertTrue(opending) # ipending = yield self.dut.epin.ev.packet.pending # self.assertTrue(ipending) # yield # yield from self.dut.pullup._out.write(1) # yield # # Make sure that the endpoints are currently blocked but not being # # triggered. # opending = yield self.dut.epout.ev.packet.pending # self.assertTrue(opending) # ipending = yield self.dut.epin.ev.packet.pending # self.assertTrue(ipending) # otrigger = yield self.dut.epout.ev.packet.trigger # self.assertFalse(otrigger) # itrigger = yield self.dut.epin.ev.packet.trigger # self.assertFalse(itrigger) yield yield from self.idle() yield from stim() run_simulation( self.dut, padfront(), vcd_name=self.make_vcd_name(), #clocks={ # "sys": 12, # "usb_48": 48, # "usb_12": 192, #}, clocks={ "sys": 2, "usb_48": 8, "usb_12": 32, }, ) print("-"*10)
[docs] def tick_sys(self): yield from self.update_internal_signals() yield
[docs] def tick_usb48(self): yield from self.wait_for_edge("usb_48")
[docs] def tick_usb12(self): for i in range(0, 4): yield from self.tick_usb48()
[docs] def update_internal_signals(self): yield from self.update_clocks()
# IRQ / packet pending -----------------
[docs] def trigger(self, epaddr): endpoint = self.get_endpoint(epaddr) status = yield endpoint.ev.packet.trigger return bool(status)
[docs] def pending(self, epaddr): # epdir = EndpointType.epdir(epaddr) # epnum = EndpointType.epnum(epaddr) # if epdir == EndpointType.OUT: # status = yield from self.dut.epout.status.read() # else: # status = yield from self.dut.epin.status.read() # return (status & 1) return 1
[docs] def clear_pending(self, epaddr): # Can't clear pending while trigger is active. for i in range(0, 100): trigger = (yield from self.trigger(epaddr)) if not trigger: break yield from self.tick_sys() self.assertFalse(trigger) # Check the pending flag is raised self.assertTrue((yield from self.pending(epaddr))) # Clear pending flag endpoint = self.get_endpoint(epaddr) yield from endpoint.ev.pending.write(0xf) yield from self.tick_sys() # Check the pending flag has been cleared self.assertFalse((yield from self.trigger(epaddr))) self.assertFalse((yield from self.pending(epaddr)))
# Endpoint state -----------------------
[docs] def response(self, epaddr): endpoint = self.get_endpoint(epaddr) response = yield endpoint.response return response
[docs] def set_response(self, epaddr, v): endpoint = self.get_endpoint(epaddr) assert isinstance(v, EndpointResponse), v yield from endpoint.respond.write(v)
[docs] def expect_last_tok(self, epaddr, value): endpoint = self.get_endpoint(epaddr) last_tok = yield from endpoint.last_tok.read() self.assertEqual(last_tok, value)
# Get/set endpoint data ----------------
[docs] def set_data(self, epaddr, data): """Set an endpoints buffer to given data to be sent.""" assert isinstance(data, (list, tuple)) self.ep_print(epaddr, "Set: %r", data) endpoint = self.get_endpoint(epaddr) # Make sure the endpoint is empty empty = yield from endpoint.ibuf_empty.read() self.assertTrue( empty, "Device->Host buffer not empty when setting data!") # If we are writing multiple bytes of data, need to make sure we are # not going to ACK the packet until the data is ready. if len(data) > 1: response = yield endpoint.response self.assertNotEqual(response, EndpointResponse.ACK) for v in data: yield from endpoint.ibuf_head.write(v) yield from self.tick_sys() for i in range(0, 10): yield from self.tick_usb12() empty = yield from endpoint.ibuf_empty.read() if len(data) > 0: self.assertFalse( bool(empty), "Buffer not empty after setting zero data!") else: self.assertTrue( bool(empty), "Buffer empty after setting data!")
[docs] def expect_setup(self, epaddr, data): epnum = EndpointType.epnum(epaddr) status = yield from self.dut.setup.status.read() self.assertTrue(status & 1) self.assertEqual(epnum, (status >> 1) & 15) actual_data = [] for i in range(0, 16): print("Loop {}".format(i)) yield from self.dut.setup.ctrl.write(1) have = (yield from self.dut.setup.status.read()) & 1 if not have: break v = yield from self.dut.setup.data.read() actual_data.append(v) yield assert len(actual_data) >= 2, actual_data actual_data, actual_crc16 = actual_data[:-2], actual_data[-2:] self.ep_print(epaddr, "Got: %r (expected: %r)", actual_data, data) self.assertSequenceEqual(data, actual_data) self.assertSequenceEqual(crc16(data), actual_crc16)
[docs] def expect_data(self, epaddr, data): """Expect that an endpoints buffer has given contents.""" epdir = EndpointType.epdir(epaddr) epnum = EndpointType.epnum(epaddr) self.assertTrue(epdir == EndpointType.OUT) # Make sure there is something pending self.assertTrue((yield from self.dut.epout.status.read()) & 1) actual_data = [] while range(0, 1024): yield from self.dut.epout.ctrl.write(1) empty = yield from self.dut.epout.status.read() & 1 if empty: break v = yield from self.dut.epout.data.read() actual_data.append(v) yield assert len(actual_data) >= 2, actual_data actual_data, actual_crc16 = actual_data[:-2], actual_data[-2:] self.ep_print(epaddr, "Got: %r (expected: %r)", actual_data, data) self.assertSequenceEqual(data, actual_data) self.assertSequenceEqual(crc16(data), actual_crc16)
[docs] def dtb(self, epaddr): endpoint = self.get_endpoint(epaddr) status = yield from endpoint.dtb.read() return bool(status)
if __name__ == '__main__': unittest.main()