from socket import socket, AF_INET, AF_INET6, SOCK_STREAM
from asyncio import get_event_loop, new_event_loop, set_event_loop, start_server as asyncserve
from multiprocessing import Process, Queue
from functools import partial, wraps
from os import getpid
from contextlib import suppress
from time import sleep
from mango.errors import error, _clientError
from mango.progress import prog, _clientProg
from mango.constants import c
from mango.debug import DBG
def _client(address, stype, message, send=True):
"""Client message sender, receive unused."""
with suppress(ConnectionRefusedError, ConnectionResetError):
sock = socket(AF_INET6 if ":" in address[0] else AF_INET, SOCK_STREAM)
sock.settimeout(10)
sock.connect((address[0], address[1]))
sock.settimeout(None)
uu_message = message
sock.send(uu_message.encode())
sock.recv(1024)
sock.close()
stype(message)
[docs]class Server():
def __init__(self, q_in, q_out, op=open):
obj = q_in.get()
address = obj[0]
s_type = obj[1]
types = {"Error": ("error", error, op), "Prog": ("prog", prog)}
hold = types[s_type][1](types[s_type][2]) if s_type in types and len(types[s_type]) == 3 else types[s_type][1]()
setattr(self, types[s_type][0], hold)
setattr(self, 'handle_it', getattr(self, f"{types[s_type][0]}handler"))
set_event_loop(new_event_loop())
loop = get_event_loop()
coro = asyncserve(self.handler, *address, loop=loop)
server = loop.run_until_complete(coro)
q_out.put(server.sockets[0].getsockname())
# Serve requests until Ctrl+C is pressed
# print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
exit(1)
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
[docs] async def handler(self, reader, writer):
data = await reader.read(1000)
await self.handle_it(data.decode().split(' ', 1))
# addr = writer.get_extra_info('peername')
# print("Received %r from %r" % (message, addr))
# print("Send: %r" % message)
# writer.write(data)
# await writer.drain()
# print("Close the client socket")
writer.close()
[docs] async def errorhandler(self, data):
"""Send errors to error class."""
if data[0] == "EV":
data = data[1].split()
self.error.setup((False if data[0] == "False" else True), data[1], int(data[2]), int(data[3]))
elif data[0] == "PID":
self.error.addpid(int(data[1]))
elif data[0] == "END":
self.error.endproc()
elif data[0] == "Test":
pass
else:
self.error.count(data[0], data[1])
[docs] async def proghandler(self, data):
"""Send progress to progress bar."""
if len(data) > 1:
getattr(self.prog, data[0])(*data[1].split())
else:
getattr(self.prog, data[0])()
[docs]def start_server(s_type):
"""Start server for multiprocessing."""
q_in = Queue()
q_out = Queue()
addr = 'localhost'
for attempt in range(5):
try:
p = Process(target=Server, args=(q_in, q_out), daemon=True)
p.start()
q_in.put([(addr, 0), s_type])
address = q_out.get()
s_type_f = globals()["_client" + s_type]
client = partial(_client, address, s_type_f)
client(f'M {s_type} server on port {address[1]}\n' if DBG else 'Test',)
break
except OSError:
if ':' in address[0]:
# fe80::1 not negotiable
addr = '::1'
p.terminate()
p.kill()
sleep(1)
else:
print("FAIL can't negotiate socket")
exit(1)
return client
[docs]class serverwrapper():
"""Decorator to start servers."""
def __init__(self, wraptype):
"""
Initialise.
Parameters
----------
wraptype: str
Server type to start eg "Error"
"""
self.wraptype = wraptype
def __call__(self, f):
"""Decorator."""
@wraps(f)
def svrwrap(*args, **kw):
if not isinstance(getattr(c, self.wraptype), partial):
setattr(c, '_' + self.wraptype, getattr(c, self.wraptype))
setattr(c, self.wraptype, start_server(self.wraptype))
if self.wraptype == "Error":
getattr(c, self.wraptype)("PID {}".format(getpid()))
ret = f(*args, **kw)
getattr(c, self.wraptype)("END")
setattr(c, self.wraptype, getattr(c, '_' + self.wraptype))
else:
ret = f(*args, **kw)
return ret
svrwrap.__name__ = f.__name__
svrwrap.__doc__ = f.__doc__
return svrwrap
[docs]class addpid():
"""Decorator to add PID to server list."""
def __init__(self, wraptype):
"""
Initialise.
Parameters
----------
wraptype: str
Server type to start eg "Error"
"""
self.wraptype = wraptype
def __call__(self, f):
"""Add PID to list."""
@wraps(f)
def pidwrap(*args, **kw):
try:
getattr(c, self.wraptype)("PID {}".format(getpid()))
except AttributeError:
print(f"{self.wraptype} server not started")
f(*args, **kw)
pidwrap.__name__ = f.__name__
pidwrap.__doc__ = f.__doc__
return pidwrap