# for micropython 1.14 2021-02-05 from machine import Pin, Timer, UART, mem32 # the co-routine (tasking) package # see # https://docs.micropython.org/en/latest/library/uasyncio.html import uasyncio as asyncio import _thread import time # datasheet page 40 sio_base = 0xd0000000 # returns 0/1 depending on CORE id cpu_id_reg = sio_base #signal from core0 to core1 run_Thread = True #================================== # set up CORE 1 thread and ISR # the thread def CORE1_task(f): led14 = Pin(14,Pin.OUT) led13 = Pin(13,Pin.OUT) # which core ? print('CORE1? ', machine.mem32[cpu_id_reg], 'start') n = 0 while(run_Thread): #n<10 and n += 1 print('CORE', machine.mem32[cpu_id_reg], n) led14.toggle() time.sleep_ms(f) # start CORE 1 def start_CORE1(): _thread.start_new_thread(CORE1_task, (200,)) #================================== # set up CORE 0 multitasking (CORO) # on obard LED led25 = Pin(25,Pin.OUT) # one set of possible UART0 pins uart_tx = machine.Pin(16) uart_rx = machine.Pin(17) # map the pins to the internal signals uart = UART(0, tx=uart_tx, rx=uart_rx) # Task to read/write to uart # I could not get StreamReader to work async def receiver(uart): while True: swriter = asyncio.StreamWriter(uart, {}) last_char = '' cmd = '' # throw awy any junk characters if (uart.any()): res = uart.read(1) # while last_char != '\r': await asyncio.sleep(0) if (uart.any()): res = uart.read(1) last_char = res.decode('UTF-8') cmd = cmd + last_char # echo typing swriter.write(last_char) await swriter.drain() # print to console for testing #print (cmd) # print to uart swriter.write('\r\n>' + cmd + '\r\n') await swriter.drain() await asyncio.sleep(0.0) # heart beat to show tasks still executing async def toggle(led25, time_ms): while True: await asyncio.sleep_ms(time_ms) led25.toggle() # define the tasks async def main(): t = 100 # start CORE 0 tasks asyncio.create_task(toggle(led25, t)) asyncio.create_task(receiver(uart)) # get the CORE ID print('CORE0? ', machine.mem32[cpu_id_reg], ' start') # === thread strt === start_CORE1() while True: await asyncio.sleep_ms(1000) def test(): try: asyncio.run(main()) except KeyboardInterrupt: # cntl-c stops the program run_Thread = False print('Interrupted') #time.sleep(1) finally: # reset event loop asyncio.new_event_loop() # Start! test()