Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from socket import socket, AF_INET, AF_INET6, SOCK_STREAM 

2from asyncio import get_event_loop, new_event_loop, set_event_loop, start_server as asyncserve 

3from multiprocessing import Process, Queue 

4from functools import partial, wraps 

5from os import getpid 

6from contextlib import suppress 

7from time import sleep 

8 

9from mango.errors import error, _clientError 

10from mango.progress import prog, _clientProg 

11from mango.constants import c 

12from mango.debug import DBG 

13 

14 

15def _client(address, stype, message, send=True): 

16 """Client message sender, receive unused.""" 

17 with suppress(ConnectionRefusedError, ConnectionResetError): 

18 sock = socket(AF_INET6 if ":" in address[0] else AF_INET, SOCK_STREAM) 

19 sock.settimeout(10) 

20 sock.connect((address[0], address[1])) 

21 sock.settimeout(None) 

22 uu_message = message 

23 sock.send(uu_message.encode()) 

24 sock.recv(1024) 

25 sock.close() 

26 stype(message) 

27 

28 

29class Server(): 

30 

31 def __init__(self, q_in, q_out, op=open): 

32 obj = q_in.get() 

33 address = obj[0] 

34 s_type = obj[1] 

35 types = {"Error": ("error", error, op), "Prog": ("prog", prog)} 

36 hold = types[s_type][1](types[s_type][2]) if s_type in types and len(types[s_type]) == 3 else types[s_type][1]() 

37 

38 setattr(self, types[s_type][0], hold) 

39 setattr(self, 'handle_it', getattr(self, f"{types[s_type][0]}handler")) 

40 

41 set_event_loop(new_event_loop()) 

42 loop = get_event_loop() 

43 coro = asyncserve(self.handler, *address, loop=loop) 

44 server = loop.run_until_complete(coro) 

45 q_out.put(server.sockets[0].getsockname()) 

46 

47 # Serve requests until Ctrl+C is pressed 

48 # print('Serving on {}'.format(server.sockets[0].getsockname())) 

49 try: 

50 loop.run_forever() 

51 except KeyboardInterrupt: 

52 exit(1) 

53 

54 # Close the server 

55 server.close() 

56 loop.run_until_complete(server.wait_closed()) 

57 loop.close() 

58 

59 async def handler(self, reader, writer): 

60 data = await reader.read(1000) 

61 await self.handle_it(data.decode().split(' ', 1)) 

62 # addr = writer.get_extra_info('peername') 

63 # print("Received %r from %r" % (message, addr)) 

64 # print("Send: %r" % message) 

65 # writer.write(data) 

66 # await writer.drain() 

67 # print("Close the client socket") 

68 writer.close() 

69 

70 async def errorhandler(self, data): 

71 """Send errors to error class.""" 

72 if data[0] == "EV": 72 ↛ 73line 72 didn't jump to line 73, because the condition on line 72 was never true

73 data = data[1].split() 

74 self.error.setup((False if data[0] == "False" else True), data[1], int(data[2]), int(data[3])) 

75 elif data[0] == "PID": 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true

76 self.error.addpid(int(data[1])) 

77 elif data[0] == "END": 77 ↛ 78line 77 didn't jump to line 78, because the condition on line 77 was never true

78 self.error.endproc() 

79 elif data[0] == "Test": 

80 pass 

81 else: 

82 self.error.count(data[0], data[1]) 

83 

84 async def proghandler(self, data): 

85 """Send progress to progress bar.""" 

86 if len(data) > 1: 

87 getattr(self.prog, data[0])(*data[1].split()) 

88 else: 

89 getattr(self.prog, data[0])() 

90 

91 

92def start_server(s_type): 

93 """Start server for multiprocessing.""" 

94 q_in = Queue() 

95 q_out = Queue() 

96 

97 addr = 'localhost' 

98 for attempt in range(5): 98 ↛ 116line 98 didn't jump to line 116, because the loop on line 98 didn't complete

99 try: 

100 p = Process(target=Server, args=(q_in, q_out), daemon=True) 

101 p.start() 

102 q_in.put([(addr, 0), s_type]) 

103 address = q_out.get() 

104 s_type_f = globals()["_client" + s_type] 

105 client = partial(_client, address, s_type_f) 

106 client(f'M {s_type} server on port {address[1]}\n' if DBG else 'Test',) 

107 break 

108 except OSError: 

109 if ':' in address[0]: 

110 # fe80::1 not negotiable 

111 addr = '::1' 

112 p.terminate() 

113 p.kill() 

114 sleep(1) 

115 else: 

116 print("FAIL can't negotiate socket") 

117 exit(1) 

118 return client 

119 

120 

121class serverwrapper(): 

122 """Decorator to start servers.""" 

123 

124 def __init__(self, wraptype): 

125 """ 

126 Initialise. 

127 

128 Parameters 

129 ---------- 

130 wraptype: str 

131 Server type to start eg "Error" 

132 

133 """ 

134 self.wraptype = wraptype 

135 

136 def __call__(self, f): 

137 """Decorator.""" 

138 @wraps(f) 

139 def svrwrap(*args, **kw): 

140 if not isinstance(getattr(c, self.wraptype), partial): 140 ↛ 141line 140 didn't jump to line 141, because the condition on line 140 was never true

141 setattr(c, '_' + self.wraptype, getattr(c, self.wraptype)) 

142 setattr(c, self.wraptype, start_server(self.wraptype)) 

143 if self.wraptype == "Error": 

144 getattr(c, self.wraptype)("PID {}".format(getpid())) 

145 ret = f(*args, **kw) 

146 getattr(c, self.wraptype)("END") 

147 setattr(c, self.wraptype, getattr(c, '_' + self.wraptype)) 

148 else: 

149 ret = f(*args, **kw) 

150 return ret 

151 svrwrap.__name__ = f.__name__ 

152 svrwrap.__doc__ = f.__doc__ 

153 return svrwrap 

154 

155 

156class addpid(): 

157 """Decorator to add PID to server list.""" 

158 

159 def __init__(self, wraptype): 

160 """ 

161 Initialise. 

162 

163 Parameters 

164 ---------- 

165 wraptype: str 

166 Server type to start eg "Error" 

167 

168 """ 

169 self.wraptype = wraptype 

170 

171 def __call__(self, f): 

172 """Add PID to list.""" 

173 @wraps(f) 

174 def pidwrap(*args, **kw): 

175 try: 

176 getattr(c, self.wraptype)("PID {}".format(getpid())) 

177 except AttributeError: 

178 print(f"{self.wraptype} server not started") 

179 f(*args, **kw) 

180 pidwrap.__name__ = f.__name__ 

181 pidwrap.__doc__ = f.__doc__ 

182 return pidwrap