Source code for nanome.api.plugin

import fnmatch
import json
import logging
import multiprocessing
import os
import random
import re
import signal
import subprocess
import string
import sys
import time
from timeit import default_timer as timer

from nanome._internal.serializer_fields import TypeSerializer
from nanome._internal.logs import LogsManager
from nanome._internal.process import ProcessManager
from nanome._internal import network
from nanome.api._hashes import Hashes
from nanome.api.serializers import CommandMessageSerializer
from nanome.util.logs import Logs
from nanome.util import config
from . import _DefaultPlugin

logger = logging.getLogger(__name__)


MAX_RECONNECT_WAIT = 20.0
KEEP_ALIVE_TIME_INTERVAL = 60.0
KEEP_ALIVE_TIMEOUT = 15.0


[docs]class Plugin(object): """Process that connects to NTS, and allows a user to access a PluginInstance. When plugin process is running, an entry is added to the Nanome Stacks Menu. When a user activates a Plugin, this class creates a PluginInstance object connected to the user's Nanome session. """ _serializer = CommandMessageSerializer() plugin_id = -1 custom_data = None def __init__(self, name, description, tags=None, has_advanced=False, permissions=None, integrations=None, version=None): """ :param name: Name of the plugin to display :type name: :class:`str` :param description: Description of the plugin to display :type description: :class:`str` :param tags: Tags of the plugin :type tags: :class:`list` <:class:`str`> :param has_advanced: If true, plugin will display an "Advanced Settings" button :type has_advanced: :class:`bool` :param version: Semantic version number of plugin. Used for logging. :type version: :class:`str` """ super(Plugin, self).__init__() self.host = '' self.port = None self.key = '' self.remote_logging = False self.write_log_file = True self.verbose = False self.has_autoreload = False tags = tags or [] permissions = permissions or [] integrations = integrations or [] self.plugin_class = _DefaultPlugin self.version = version self._sessions = dict() self._process_manager = ProcessManager() if isinstance(tags, str): tags = [tags] category = "" if len(tags) > 0: category = tags[0] for i in range(0, len(permissions)): permissions[i] = Hashes.PermissionRequestHashes[permissions[i]] for i in range(0, len(integrations)): integrations[i] = Hashes.IntegrationRequestHashes[integrations[i]] self._description = { 'name': name, 'description': description, 'category': category, 'tags': tags, 'hasAdvanced': has_advanced, 'auth': None, 'permissions': permissions, 'integrations': integrations } self.connected = False self._to_ignore = [] self.__waiting_keep_alive = False self._pre_run = None self._post_run = None
[docs] @staticmethod def create_parser(): """Command Line Interface for Plugins. Moved into config.py, but there are plugins that retrieve parser from Plugin class. Leaving this here for backwards compatibility. rtype: argsparser: args parser """ return config.create_parser()
[docs] def run(self, host=None, port=None, key=None): """ | Starts the plugin by connecting to the server specified. | If arguments (-a, -p) are given when starting plugin, host/port will be ignored. | Function will return only when plugin exits. :param host: NTS IP address if plugin started without -a option :param port: NTS port if plugin started without -p option :type host: str :type port: int """ settings = config.load_settings() self.host = host if host else settings.get('host') if not self.host: Logs.error('No NTS host provided') sys.exit(1) try: self.port = int(port) if port else int(settings.get('port')) except ValueError: Logs.error('Port must be an integer, received \"{}\"'.format(port)) sys.exit(1) self.key = key if key is not None else settings.get('key') self.write_log_file = settings.get('write_log_file') or False self.remote_logging = settings.get('remote_logging') or False self.has_autoreload = settings.get('auto_reload') self.verbose = settings.get('verbose') if settings.get('ignore'): to_ignore = settings.get('ignore').split(",") self.to_ignore = to_ignore # Name can be set during the class instantiation without cli arg. if settings.get('name'): self.name = settings.get('name') # Configure Logging self.__log_filename = self.plugin_class.__name__ + ".log" self._logs_manager = LogsManager( self.__log_filename, plugin=self, write_log_file=self.write_log_file, remote_logging=self.remote_logging) self._logs_manager.configure_main_process() Logs.message("Starting Plugin") if self.has_autoreload: self._autoreload() else: self._run()
[docs] @classmethod def setup(cls, name, description, tags, has_advanced, plugin_class, host=None, port=None, key=None, permissions=None, integrations=None): permissions = permissions or [] integrations = integrations or [] if not cls._is_process(): plugin = cls(name, description, tags, has_advanced, permissions, integrations) plugin.plugin_class = plugin_class plugin.run(host, port, key)
[docs] def set_custom_data(self, *args): """ | Store arbitrary data to send to plugin instances :param args: Variable length argument list :type args: Anything serializable """ self.custom_data = args
[docs] @staticmethod def set_maximum_processes_count(max_process_nb): ProcessManager._max_process_count = max_process_nb
@property def to_ignore(self): return getattr(self, '_to_ignore') @to_ignore.setter def to_ignore(self, value): setattr(self, '_to_ignore', value) @property def name(self): """Name of plugin as shown in the Nanome Stacks menu.""" return self._description.get('name') @name.setter def name(self, value): self._description['name'] = value
[docs] def set_plugin_class(self, plugin_class): """ | Set plugin class to instantiate when a new session is connected | The plugin class should interact with or override functions in :class:`~nanome.PluginInstance` to interact with Nanome :param plugin_class: Plugin class to instantiate :type plugin_class: :class:`~nanome.PluginInstance` """ self.plugin_class = plugin_class
@classmethod def _run_plugin_instance( cls, plugin_instance_class, session_id, net_queue_in, net_queue_out, pm_queue_in, pm_queue_out, log_pipe_conn, serializer, plugin_id, version_table, original_version_table, custom_data, permissions): """When user activates a plugin, this function is run to begin the new process. :arg plugin_instance_class: The Plugininstance class to be instantiated. :arg session_id: The session ID registered with NTS. :arg net_queue_in: The network input queue. :arg net_queue_out: The network output queue. :arg pm_queue_in: The process manager input queue. :arg pm_queue_out: The process manager output queue. :arg log_pipe_conn: The pipe to communicate with the logs manager. :arg serializer: The serializer to use to create NTS message payloads. :arg plugin_id: The plugin ID registered with NTS. :arg version_table: The version table of the plugin, used to setup the serializer. :arg original_version_table: The original version table of the plugin, used to setup the serializer. :arg custom_data: Arbitrary data that can be passed to each instantiated PluginInstance :arg permissions: The permissions of the plugin. """ plugin_instance = plugin_instance_class() plugin_network = network.PluginNetwork( plugin_instance, session_id, net_queue_in, net_queue_out, serializer, plugin_id, version_table) plugin_instance._setup( session_id, plugin_network, pm_queue_in, pm_queue_out, log_pipe_conn, original_version_table, custom_data, permissions) LogsManager.configure_child_process(plugin_instance) logger.debug("Starting plugin") plugin_instance._run() def _run(self): # set_start_method ensures consistent process behavior between Windows and Linux if sys.version_info.major >= 3 and sys.version_info.minor >= 4: multiprocessing.set_start_method('spawn', force=True) if os.name == "nt": signal.signal(signal.SIGBREAK, self.__on_termination_signal) else: signal.signal(signal.SIGTERM, self.__on_termination_signal) if self._pre_run is not None: self._pre_run() self._description['auth'] = self.__read_key() self._process_manager = ProcessManager() self.__reconnect_attempt = 0 self._connect() self._loop() def _connect(self): """Create network Connection to NTS, and start listening for packets.""" self._network = network.NetInstance(self, self.__class__._on_packet_received) if self._network.connect(self.host, self.port): if self.plugin_id >= 0: plugin_id = self.plugin_id else: plugin_id = 0 packet = network.Packet() packet.set(0, network.Packet.packet_type_plugin_connection, plugin_id) packet.write_string(json.dumps(self._description)) self._network.send(packet) self.connected = True self.__reconnect_attempt = 0 self.__waiting_keep_alive = False self.__last_keep_alive = timer() for session in self._sessions.values(): session._net_plugin = self._network return True else: self.__disconnection_time = timer() self.__reconnect_attempt += 1 return False def _loop(self): to_remove = [] try: while True: now = timer() if self.connected is False: reconnect_wait = min(2 ** self.__reconnect_attempt, MAX_RECONNECT_WAIT) elapsed = now - self.__disconnection_time if elapsed >= reconnect_wait: logger.info("Trying to reconnect...") if self._connect() is False: if self.__reconnect_attempt == 3: self.__disconnect() continue else: time.sleep(reconnect_wait - elapsed) continue if self._network.receive() is False: self.connected = False self.__disconnection_time = timer() self._network.disconnect() continue if self.__waiting_keep_alive: if now - self.__last_keep_alive >= KEEP_ALIVE_TIMEOUT: self.connected = False self.__disconnection_time = timer() continue elif now - self.__last_keep_alive >= KEEP_ALIVE_TIME_INTERVAL and self.plugin_id >= 0: self.__last_keep_alive = now self.__waiting_keep_alive = True packet = network.Packet() packet.set(self.plugin_id, network.Packet.packet_type_keep_alive, 0) self._network.send(packet) del to_remove[:] for id, session in self._sessions.items(): if session.read_from_plugin() is False: session.close_pipes() to_remove.append(id) for id in to_remove: self._sessions[id]._send_disconnection_message(self.plugin_id) del self._sessions[id] self._process_manager.update() except KeyboardInterrupt: self.__exit() def _start_session_process(self, session_id, version_table): """Setup Queues and networking for PluginInstance, run session process.""" if session_id in self._sessions: # If session_id already exists, close it first () logger.info("Closing session ID {} because a new session connected with the same ID".format(session_id)) self._sessions[session_id].signal_and_close_pipes() net_queue_in = multiprocessing.Queue() net_queue_out = multiprocessing.Queue() pm_queue_in = multiprocessing.Queue() pm_queue_out = multiprocessing.Queue() session = network.Session( session_id, self._network, self._process_manager, self._logs_manager, net_queue_in, net_queue_out, pm_queue_in, pm_queue_out) permissions = self._description["permissions"] log_pipe_conn = self._logs_manager.child_pipe_conn process = multiprocessing.Process( target=self._run_plugin_instance, args=( self.plugin_class, session_id, net_queue_in, net_queue_out, pm_queue_in, pm_queue_out, log_pipe_conn, self._serializer, self.plugin_id, version_table, TypeSerializer.get_version_table(), self.custom_data, permissions ) ) # Appending random string to process name makes tracking unique sessions easier random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6)) process.name = "Session-{}-{}".format(session_id, random_str) process.start() session.plugin_process = process self._sessions[session_id] = session extra = {'session_id': session_id} logger.info("Registered new session: {}".format(session_id), extra=extra) def _on_packet_received(self, packet): """When packet received, identify Packet type, and call appropriate function.""" if packet.packet_type == network.Packet.packet_type_message_to_plugin: session_id = packet.session_id # Always look if we're trying to register a session # Fix 5/27/2021 - Jeremie: We need to always check for session registration in order to fix timeout issues # When NTS forces disconnection because of plugin list change, session_id still exists in self._sessions, # even though it was disconnected for Nanome if self._serializer.try_register_session(packet.payload) is True: received_version_table, _, _ = self._serializer.deserialize_command(packet.payload, None) version_table = TypeSerializer.get_best_version_table(received_version_table) self.__on_client_connection(session_id, version_table) return True if session_id in self._sessions: # packet.decompress() self._sessions[session_id]._on_packet_received(packet.payload) return True # Doesn't register? It's an error logger.warning("Received a command from an unregistered session {}".format(session_id)) elif packet.packet_type == network.Packet.packet_type_plugin_connection: self.plugin_id = packet.plugin_id logger.info("Registered with plugin ID {}\n=======================================\n".format(str(self.plugin_id))) elif packet.packet_type == network.Packet.packet_type_plugin_disconnection: if self.plugin_id == -1: if self._description['auth'] is None: logger.error("Connection refused by NTS. Are you missing a security key file?") else: logger.error("Connection refused by NTS. Your security key file might be invalid") sys.exit(1) else: logger.info("Connection ended by NTS") self.plugin_id = -1 return False elif packet.packet_type == network.Packet.packet_type_client_disconnection: try: id = packet.session_id self._sessions[id].signal_and_close_pipes() del self._sessions[id] extra = {'session_id': id} logger.info("Session {} disconnected".format(id), extra=extra) except Exception: pass elif packet.packet_type == network.Packet.packet_type_keep_alive: self.__waiting_keep_alive = False elif packet.packet_type == network.Packet.packet_type_logs_request: self.__logs_request(packet) else: logger.warning("Received a packet of unknown type {}. Ignoring".format(packet.packet_type)) return True @staticmethod def _is_process(): return multiprocessing.current_process().name != 'MainProcess' def __read_key(self): if not self.key: return # check if arg is key data elif re.match(r'^[0-9A-F]+$', self.key): return self.key try: f = open(self.key, "r") key = f.read().strip() return key except Exception: return None def __disconnect(self): to_remove = [] for id in self._sessions.keys(): to_remove.append(id) for id in to_remove: del self._sessions[id] def __on_termination_signal(self, signum, frame): self.__exit() def __exit(self): logger.debug('Exiting') for session in self._sessions.values(): session.signal_and_close_pipes() session.plugin_process.join() if self._post_run is not None: self._post_run() sys.exit(0) def __on_client_connection(self, session_id, version_table): self._start_session_process(session_id, version_table) def __logs_request(self, packet): try: id_str = packet.payload.decode('utf-8') id = int(id_str) except Exception: logger.error('Received a broken log request from NTS: {}'.format(packet.payload)) try: with open(self.__log_filename, 'r') as content_file: content = content_file.read() except Exception: content = '' response = { 'id': id, 'logs': content } packet = network.Packet() packet.set(0, network.Packet.packet_type_logs_request, 0) packet.write_string(json.dumps(response)) def _autoreload(self): wait = 3 if os.name == "nt": sub_kwargs = {'creationflags': subprocess.CREATE_NEW_PROCESS_GROUP} break_signal = signal.CTRL_BREAK_EVENT else: sub_kwargs = {} break_signal = signal.SIGTERM # Make sure autoreload is turned off for child processes. sub_args = [x for x in sys.argv if x != '-r' and x != "--auto-reload"] popen_environ = dict(os.environ) popen_environ.pop('PLUGIN_AUTO_RELOAD', None) try: sub_args = [sys.executable] + sub_args process = subprocess.Popen(sub_args, env=popen_environ, **sub_kwargs) except Exception: logger.error("Couldn't find a suitable python executable") sys.exit(1) last_mtime = max(self.__file_times(".")) while True: try: max_mtime = max(self.__file_times(".")) if max_mtime > last_mtime: last_mtime = max_mtime logger.info("Restarting plugin") process.send_signal(break_signal) process = subprocess.Popen(sub_args, **sub_kwargs) time.sleep(wait) except KeyboardInterrupt: process.send_signal(break_signal) break def __file_times(self, path): found_file = False for root, dirs, files in os.walk(path): for file in filter(self.__file_filter, files): file_path = os.path.join(root, file) matched = False for pattern in self._to_ignore: if fnmatch.fnmatch(file_path, pattern): matched = True if matched is False: found_file = True yield os.stat(file_path).st_mtime if found_file is False: yield 0.0 def __file_filter(self, name): return name.endswith(".py") or name.endswith(".json") @property def pre_run(self): """ | Function to call before the plugin runs and tries to connect to NTS | Useful when using autoreload """ return self._pre_run @pre_run.setter def pre_run(self, value): self._pre_run = value @property def post_run(self): """ | Function to call when the plugin is about to exit | Useful when using autoreload """ return self._post_run @post_run.setter def post_run(self, value): self._post_run = value