149 lines
3.1 KiB
Python
149 lines
3.1 KiB
Python
|
|
import queue
|
||
|
|
import threading
|
||
|
|
from enum import Enum
|
||
|
|
from typing import override
|
||
|
|
|
||
|
|
import simpy
|
||
|
|
import simpy.rt
|
||
|
|
from prompt_toolkit import PromptSession
|
||
|
|
from prompt_toolkit.patch_stdout import patch_stdout
|
||
|
|
|
||
|
|
nodes_n = {
|
||
|
|
("A", "1", "A'"),
|
||
|
|
("B", "1", "B'"),
|
||
|
|
("AB", "A", "AB'"),
|
||
|
|
("A'B", "A'", "A'B'"),
|
||
|
|
("A'B", "B", "AB"),
|
||
|
|
("A'B'", "B'", "AB'"),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
f = {
|
||
|
|
"000": "000",
|
||
|
|
"001": "0NN",
|
||
|
|
"00N": "000",
|
||
|
|
"010": "NNN",
|
||
|
|
"011": "011",
|
||
|
|
"01N": "011",
|
||
|
|
"0N0": "000",
|
||
|
|
"0N1": "011",
|
||
|
|
"0NN": "0NN",
|
||
|
|
"100": "NN0",
|
||
|
|
"101": "NNN",
|
||
|
|
"10N": "NNN",
|
||
|
|
"110": "110",
|
||
|
|
"111": "111",
|
||
|
|
"11N": "11N",
|
||
|
|
"1N0": "110",
|
||
|
|
"1N1": "111",
|
||
|
|
"1NN": "11N",
|
||
|
|
"N00": "000",
|
||
|
|
"N01": "NNN",
|
||
|
|
"N0N": "000",
|
||
|
|
"N10": "110",
|
||
|
|
"N11": "N11",
|
||
|
|
"N1N": "N1N",
|
||
|
|
"NN0": "NN0",
|
||
|
|
"NN1": "NN1",
|
||
|
|
"NNN": "NNN",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class Tern(Enum):
|
||
|
|
N = -1
|
||
|
|
U = 0
|
||
|
|
Y = 1
|
||
|
|
|
||
|
|
@override
|
||
|
|
def __str__(self):
|
||
|
|
if self.value == -1:
|
||
|
|
return "0"
|
||
|
|
if self.value == 1:
|
||
|
|
return "1"
|
||
|
|
return "N"
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_string(cls, s: str) -> "Tern":
|
||
|
|
if s == "N":
|
||
|
|
return Tern.U
|
||
|
|
if s == "0":
|
||
|
|
return Tern.N
|
||
|
|
if s == "1":
|
||
|
|
return Tern.Y
|
||
|
|
return cls[s]
|
||
|
|
|
||
|
|
|
||
|
|
def node(env: simpy.Environment, node: tuple[str, str, str], values: dict[str, Tern]):
|
||
|
|
while True:
|
||
|
|
nn, uu, yy = node
|
||
|
|
old = "".join(str(values[x]) for x in node)
|
||
|
|
new = f[old]
|
||
|
|
|
||
|
|
n, u, y = new
|
||
|
|
|
||
|
|
values[nn], values[uu], values[yy] = (
|
||
|
|
Tern.from_string(n),
|
||
|
|
Tern.from_string(u),
|
||
|
|
Tern.from_string(y),
|
||
|
|
)
|
||
|
|
print(values)
|
||
|
|
yield env.timeout(1)
|
||
|
|
|
||
|
|
|
||
|
|
def input_thread(commands: queue.Queue[tuple[str, Tern]]):
|
||
|
|
session: PromptSession[str] = PromptSession("> ")
|
||
|
|
|
||
|
|
with patch_stdout():
|
||
|
|
while True:
|
||
|
|
line = session.prompt().strip()
|
||
|
|
|
||
|
|
if line in {"q", "quit", "exit"}:
|
||
|
|
commands.put(("__quit__", Tern.U))
|
||
|
|
break
|
||
|
|
|
||
|
|
try:
|
||
|
|
name, value = line.split()
|
||
|
|
commands.put((name, Tern.from_string(value)))
|
||
|
|
except Exception:
|
||
|
|
print("format: A 1 | A 0 | A N | quit")
|
||
|
|
|
||
|
|
|
||
|
|
def user_input_process(
|
||
|
|
env: simpy.Environment,
|
||
|
|
values: dict[str, Tern],
|
||
|
|
commands: queue.Queue[tuple[str, Tern]],
|
||
|
|
):
|
||
|
|
while True:
|
||
|
|
while not commands.empty():
|
||
|
|
name, value = commands.get_nowait()
|
||
|
|
|
||
|
|
if name == "__quit__":
|
||
|
|
return
|
||
|
|
|
||
|
|
if name not in values:
|
||
|
|
print(f"unknown node: {name}")
|
||
|
|
continue
|
||
|
|
|
||
|
|
values[name] = value
|
||
|
|
print(f"[t={env.now}] user set {name} = {value}")
|
||
|
|
|
||
|
|
yield env.timeout(0.1)
|
||
|
|
|
||
|
|
|
||
|
|
ls = {x: Tern.U for triple in nodes_n for x in triple}
|
||
|
|
commands: queue.Queue[tuple[str, Tern]] = queue.Queue()
|
||
|
|
|
||
|
|
env = simpy.rt.RealtimeEnvironment(factor=1.0, strict=False)
|
||
|
|
|
||
|
|
threading.Thread(
|
||
|
|
target=input_thread,
|
||
|
|
args=(commands,),
|
||
|
|
daemon=True,
|
||
|
|
).start()
|
||
|
|
|
||
|
|
for n in nodes_n:
|
||
|
|
_ = env.process(node(env, n, ls))
|
||
|
|
|
||
|
|
_ = env.process(user_input_process(env, ls, commands))
|
||
|
|
|
||
|
|
_ = env.run(until=100)
|