Source code for mango.managers

from socket import socket, AF_INET, AF_INET6, SOCK_STREAM
from asyncio import get_event_loop, new_event_loop, set_event_loop, start_server as asyncserve
from multiprocessing import Process, Queue
from functools import partial, wraps
from os import getpid
from contextlib import suppress
from time import sleep

from mango.errors import error, _clientError
from mango.progress import prog, _clientProg
from mango.constants import c
from mango.debug import DBG


def _client(address, stype, message, send=True):
    """Client message sender, receive unused."""
    with suppress(ConnectionRefusedError, ConnectionResetError):
        sock = socket(AF_INET6 if ":" in address[0] else AF_INET, SOCK_STREAM)
        sock.settimeout(10)
        sock.connect((address[0], address[1]))
        sock.settimeout(None)
        uu_message = message
        sock.send(uu_message.encode())
        sock.recv(1024)
        sock.close()
        stype(message)


[docs]class Server(): def __init__(self, q_in, q_out, op=open): obj = q_in.get() address = obj[0] s_type = obj[1] types = {"Error": ("error", error, op), "Prog": ("prog", prog)} hold = types[s_type][1](types[s_type][2]) if s_type in types and len(types[s_type]) == 3 else types[s_type][1]() setattr(self, types[s_type][0], hold) setattr(self, 'handle_it', getattr(self, f"{types[s_type][0]}handler")) set_event_loop(new_event_loop()) loop = get_event_loop() coro = asyncserve(self.handler, *address, loop=loop) server = loop.run_until_complete(coro) q_out.put(server.sockets[0].getsockname()) # Serve requests until Ctrl+C is pressed # print('Serving on {}'.format(server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: exit(1) # Close the server server.close() loop.run_until_complete(server.wait_closed()) loop.close()
[docs] async def handler(self, reader, writer): data = await reader.read(1000) await self.handle_it(data.decode().split(' ', 1)) # addr = writer.get_extra_info('peername') # print("Received %r from %r" % (message, addr)) # print("Send: %r" % message) # writer.write(data) # await writer.drain() # print("Close the client socket") writer.close()
[docs] async def errorhandler(self, data): """Send errors to error class.""" if data[0] == "EV": data = data[1].split() self.error.setup((False if data[0] == "False" else True), data[1], int(data[2]), int(data[3])) elif data[0] == "PID": self.error.addpid(int(data[1])) elif data[0] == "END": self.error.endproc() elif data[0] == "Test": pass else: self.error.count(data[0], data[1])
[docs] async def proghandler(self, data): """Send progress to progress bar.""" if len(data) > 1: getattr(self.prog, data[0])(*data[1].split()) else: getattr(self.prog, data[0])()
[docs]def start_server(s_type): """Start server for multiprocessing.""" q_in = Queue() q_out = Queue() addr = 'localhost' for attempt in range(5): try: p = Process(target=Server, args=(q_in, q_out), daemon=True) p.start() q_in.put([(addr, 0), s_type]) address = q_out.get() s_type_f = globals()["_client" + s_type] client = partial(_client, address, s_type_f) client(f'M {s_type} server on port {address[1]}\n' if DBG else 'Test',) break except OSError: if ':' in address[0]: # fe80::1 not negotiable addr = '::1' p.terminate() p.kill() sleep(1) else: print("FAIL can't negotiate socket") exit(1) return client
[docs]class serverwrapper(): """Decorator to start servers.""" def __init__(self, wraptype): """ Initialise. Parameters ---------- wraptype: str Server type to start eg "Error" """ self.wraptype = wraptype def __call__(self, f): """Decorator.""" @wraps(f) def svrwrap(*args, **kw): if not isinstance(getattr(c, self.wraptype), partial): setattr(c, '_' + self.wraptype, getattr(c, self.wraptype)) setattr(c, self.wraptype, start_server(self.wraptype)) if self.wraptype == "Error": getattr(c, self.wraptype)("PID {}".format(getpid())) ret = f(*args, **kw) getattr(c, self.wraptype)("END") setattr(c, self.wraptype, getattr(c, '_' + self.wraptype)) else: ret = f(*args, **kw) return ret svrwrap.__name__ = f.__name__ svrwrap.__doc__ = f.__doc__ return svrwrap
[docs]class addpid(): """Decorator to add PID to server list.""" def __init__(self, wraptype): """ Initialise. Parameters ---------- wraptype: str Server type to start eg "Error" """ self.wraptype = wraptype def __call__(self, f): """Add PID to list.""" @wraps(f) def pidwrap(*args, **kw): try: getattr(c, self.wraptype)("PID {}".format(getpid())) except AttributeError: print(f"{self.wraptype} server not started") f(*args, **kw) pidwrap.__name__ = f.__name__ pidwrap.__doc__ = f.__doc__ return pidwrap