Source code for nanome.beta.nanome_sdk.utils

import asyncio
import enum
import logging
import random
import sys
from nanome._internal.network import Packet, Data


logger = logging.getLogger(__name__)


[docs]class PacketTypes(enum.IntEnum): plugin_list = 0 plugin_connection = 1 client_connection = 2 message_to_plugin = 3 message_to_client = 4 plugin_disconnection = 5 client_disconnection = 6 master_change = 7 keep_alive = 8 logs_request = 9 live_logs = 10
[docs]def random_request_id(): """Generate a random but valid request id.""" max_req_id = 4294967295 request_id = random.randint(0, max_req_id) return request_id
[docs]def convert_bytes_to_packet(received_bytes): """Parse received bytes into Packet instance.""" data = Data() data.received_data(received_bytes) packet = Packet() got_header = packet.get_header(data) got_payload = packet.get_payload(data) if not got_header: logger.warning("Could not get packet header") if not got_payload: logger.warning("Could not get packet payload") return packet
[docs]async def connect_stdin_stdout(): """Wrap stdin and stdout in StreamReader and StreamWriter interface. allows async reading and writing. """ loop = asyncio.get_event_loop() reader = asyncio.StreamReader(limit=2**32) protocol = asyncio.StreamReaderProtocol(reader) await loop.connect_read_pipe(lambda: protocol, sys.stdin) w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout) writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop) return reader, writer