Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from socket import socket, AF_INET, AF_INET6, SOCK_STREAM
2from asyncio import get_event_loop, new_event_loop, set_event_loop, start_server as asyncserve
3from multiprocessing import Process, Queue
4from functools import partial, wraps
5from os import getpid
6from contextlib import suppress
7from time import sleep
9from mango.errors import error, _clientError
10from mango.progress import prog, _clientProg
11from mango.constants import c
12from mango.debug import DBG
15def _client(address, stype, message, send=True):
16 """Client message sender, receive unused."""
17 with suppress(ConnectionRefusedError, ConnectionResetError):
18 sock = socket(AF_INET6 if ":" in address[0] else AF_INET, SOCK_STREAM)
19 sock.settimeout(10)
20 sock.connect((address[0], address[1]))
21 sock.settimeout(None)
22 uu_message = message
23 sock.send(uu_message.encode())
24 sock.recv(1024)
25 sock.close()
26 stype(message)
29class Server():
31 def __init__(self, q_in, q_out, op=open):
32 obj = q_in.get()
33 address = obj[0]
34 s_type = obj[1]
35 types = {"Error": ("error", error, op), "Prog": ("prog", prog)}
36 hold = types[s_type][1](types[s_type][2]) if s_type in types and len(types[s_type]) == 3 else types[s_type][1]()
38 setattr(self, types[s_type][0], hold)
39 setattr(self, 'handle_it', getattr(self, f"{types[s_type][0]}handler"))
41 set_event_loop(new_event_loop())
42 loop = get_event_loop()
43 coro = asyncserve(self.handler, *address, loop=loop)
44 server = loop.run_until_complete(coro)
45 q_out.put(server.sockets[0].getsockname())
47 # Serve requests until Ctrl+C is pressed
48 # print('Serving on {}'.format(server.sockets[0].getsockname()))
49 try:
50 loop.run_forever()
51 except KeyboardInterrupt:
52 exit(1)
54 # Close the server
55 server.close()
56 loop.run_until_complete(server.wait_closed())
57 loop.close()
59 async def handler(self, reader, writer):
60 data = await reader.read(1000)
61 await self.handle_it(data.decode().split(' ', 1))
62 # addr = writer.get_extra_info('peername')
63 # print("Received %r from %r" % (message, addr))
64 # print("Send: %r" % message)
65 # writer.write(data)
66 # await writer.drain()
67 # print("Close the client socket")
68 writer.close()
70 async def errorhandler(self, data):
71 """Send errors to error class."""
72 if data[0] == "EV": 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true
73 data = data[1].split()
74 self.error.setup((False if data[0] == "False" else True), data[1], int(data[2]), int(data[3]))
75 elif data[0] == "PID": 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true
76 self.error.addpid(int(data[1]))
77 elif data[0] == "END": 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true
78 self.error.endproc()
79 elif data[0] == "Test":
80 pass
81 else:
82 self.error.count(data[0], data[1])
84 async def proghandler(self, data):
85 """Send progress to progress bar."""
86 if len(data) > 1:
87 getattr(self.prog, data[0])(*data[1].split())
88 else:
89 getattr(self.prog, data[0])()
92def start_server(s_type):
93 """Start server for multiprocessing."""
94 q_in = Queue()
95 q_out = Queue()
97 addr = 'localhost'
98 for attempt in range(5): 98 ↛ 116line 98 didn't jump to line 116, because the loop on line 98 didn't complete
99 try:
100 p = Process(target=Server, args=(q_in, q_out), daemon=True)
101 p.start()
102 q_in.put([(addr, 0), s_type])
103 address = q_out.get()
104 s_type_f = globals()["_client" + s_type]
105 client = partial(_client, address, s_type_f)
106 client(f'M {s_type} server on port {address[1]}\n' if DBG else 'Test',)
107 break
108 except OSError:
109 if ':' in address[0]:
110 # fe80::1 not negotiable
111 addr = '::1'
112 p.terminate()
113 p.kill()
114 sleep(1)
115 else:
116 print("FAIL can't negotiate socket")
117 exit(1)
118 return client
121class serverwrapper():
122 """Decorator to start servers."""
124 def __init__(self, wraptype):
125 """
126 Initialise.
128 Parameters
129 ----------
130 wraptype: str
131 Server type to start eg "Error"
133 """
134 self.wraptype = wraptype
136 def __call__(self, f):
137 """Decorator."""
138 @wraps(f)
139 def svrwrap(*args, **kw):
140 if not isinstance(getattr(c, self.wraptype), partial): 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true
141 setattr(c, '_' + self.wraptype, getattr(c, self.wraptype))
142 setattr(c, self.wraptype, start_server(self.wraptype))
143 if self.wraptype == "Error":
144 getattr(c, self.wraptype)("PID {}".format(getpid()))
145 ret = f(*args, **kw)
146 getattr(c, self.wraptype)("END")
147 setattr(c, self.wraptype, getattr(c, '_' + self.wraptype))
148 else:
149 ret = f(*args, **kw)
150 return ret
151 svrwrap.__name__ = f.__name__
152 svrwrap.__doc__ = f.__doc__
153 return svrwrap
156class addpid():
157 """Decorator to add PID to server list."""
159 def __init__(self, wraptype):
160 """
161 Initialise.
163 Parameters
164 ----------
165 wraptype: str
166 Server type to start eg "Error"
168 """
169 self.wraptype = wraptype
171 def __call__(self, f):
172 """Add PID to list."""
173 @wraps(f)
174 def pidwrap(*args, **kw):
175 try:
176 getattr(c, self.wraptype)("PID {}".format(getpid()))
177 except AttributeError:
178 print(f"{self.wraptype} server not started")
179 f(*args, **kw)
180 pidwrap.__name__ = f.__name__
181 pidwrap.__doc__ = f.__doc__
182 return pidwrap