import os
import pickle
import random
import redis
import time
import uuid
from typing import List, Union
from nanome import PluginInstance
from nanome._internal.enums import Messages
from nanome._internal.network import Packet
from nanome.api.serializers import CommandMessageSerializer
from nanome.api.streams import Stream
from nanome.api import shapes, structure, ui, user
from nanome.util import Logs, enums, Vector3, Quaternion
from nanome.util.file import LoadInfoDone
__all__ = ['PluginInstanceRedisInterface']
workspace_struct = Union[
structure.Complex, structure.Molecule, structure.Chain,
structure.Residue, structure.Atom, structure.Bond]
ui_content = Union[
ui.Button, ui.Label, ui.Mesh, ui.Image,
ui.Dropdown, ui.DropdownItem, ui.LoadingBar,
ui.Slider, ui.TextInput
]
NTS_RESPONSE_TIMEOUT = os.environ.get('NTS_RESPONSE_TIMEOUT', 30)
def random_request_id():
"""Generate a random but valid request id."""
max_req_id = 4294967295
request_id = random.randint(0, max_req_id)
return request_id
[docs]class PluginInstanceRedisInterface:
"""Provides interface for publishing PluginInstance RPC requests over Redis.
The idea is to feel like you're using the standard
PluginInstance, but all calls are being made through Redis.
"""
def __init__(self, redis_host, redis_port, redis_password, redis_channel=None):
"""Initialize the Connection to Redis."""
self.redis = redis.Redis(host=redis_host, port=redis_port, password=redis_password)
self.plugin_class = PluginInstance
self.channel = redis_channel
self.plugin_id = None
self.session_id = None
self.version_table = None
[docs] def connect(self):
"""Ping Redis, and then get data from plugin required for serialization."""
self.redis.ping()
plugin_data = self.get_plugin_data()
self.plugin_id = plugin_data['plugin_id']
self.session_id = plugin_data['session_id']
self.version_table = plugin_data['version_table']
[docs] def create_writing_stream(self, indices_list: List[int], stream_type: enums.StreamType) -> Stream:
"""Create a stream allowing the plugin to continuously update properties of many objects."""
message_type = Messages.stream_create
expects_response = True
args = (stream_type, indices_list, enums.StreamDirection.writing)
response = self._send_message(message_type, args, expects_response)
err_code, stream_args = response[0], response[1:]
if err_code != 0:
raise ValueError(f"Error creating stream: {err_code}")
stream = Stream(None, *stream_args)
return stream
[docs] def request_workspace(self) -> structure.Workspace:
"""Request the entire workspace, in deep mode"""
message_type = Messages.workspace_request
expects_response = True
args = None
response = self._send_message(message_type, args, expects_response)
return response
[docs] def request_complexes(self, id_list: List[int]) -> List[structure.Complex]:
"""Requests a list of complexes by their indices.
Complexes returned contains the full structure (atom/bond/residue/chain/molecule)
"""
message_type = Messages.complexes_request
expects_response = True
args = id_list
response = self._send_message(message_type, args, expects_response)
return response
[docs] def update_structures_shallow(self, structures: List[workspace_struct]):
"""Update the specific molecular structures in the scene to match the structures in parameter.
Only updates the structure's data, will not update children or other descendents.
"""
message_type = Messages.structures_shallow_update
expects_response = False
args = structures
self._send_message(message_type, args, expects_response)
[docs] def update_structures_deep(self, struct_list: List[workspace_struct]):
"""Update the specific molecular structures in the scene to match the structures in parameter.
Will also update descendent structures and can be used to remove descendent structures.
"""
message_type = Messages.structures_deep_update
expects_response = True
args = struct_list
response = self._send_message(message_type, args, expects_response)
return response
[docs] def request_complex_list(self) -> List[structure.Complex]:
"""Request the list of all complexes in the workspace, in shallow mode."""
message_type = Messages.complex_list_request
args = None
expects_response = True
response = self._send_message(message_type, args, expects_response)
return response
def _send_message(self, message_type: Messages, fn_args, expects_response):
function_name = message_type.name
request_id, packet = self._build_packet(message_type, fn_args, expects_response)
message = self._build_message(function_name, request_id, packet, expects_response)
serialized_response = self._rpc_request(message, expects_response=expects_response)
if serialized_response is not None:
response = self._deserialize_payload(serialized_response)
return response
[docs] def zoom_on_structures(self, structures: workspace_struct) -> None:
"""Repositions and resizes the workspace such that the provided structure(s) will be in the
center of the users view.
"""
message_type = Messages.structures_zoom
expects_response = False
args = structures
self._send_message(message_type, args, expects_response)
[docs] async def send_notification(self, notification_type: enums.NotificationTypes, message: str):
"""Send a notification to the user"""
message_type = Messages.notification_send
expects_response = False
args = [notification_type, message]
self._send_message(message_type, args, expects_response)
[docs] def center_on_structures(self, structures: workspace_struct):
"""Repositions the workspace such that the provided structure(s) will be in the center of the world."""
message_type = Messages.structures_center
expects_response = False
args = structures
self._send_message(message_type, args, expects_response)
[docs] def add_to_workspace(self, complex_list: List[structure.Complex]):
"""Add a list of complexes to the current workspace."""
message_type = Messages.add_to_workspace
expects_response = True
args = complex_list
response = self._send_message(message_type, args, expects_response)
return response
[docs] def open_url(self, url: str, desktop_browser: bool = False) -> None:
"""Open a URL in the user's browser."""
message_type = Messages.open_url
expects_response = False
args = (url, desktop_browser)
self._send_message(message_type, args, expects_response)
[docs] def request_presenter_info(self) -> user.PresenterInfo:
"""Get info about the presenter."""
message_type = Messages.presenter_info_request
expects_response = True
args = None
result = self._send_message(message_type, args, expects_response)
return result
[docs] def apply_color_scheme(self, color_scheme: enums.ColorScheme, target: enums.ColorSchemeTarget, only_carbons: bool = False):
"""Applies a color scheme to selected atoms."""
message_type = Messages.apply_color_scheme
expects_response = False
args = (color_scheme, target, only_carbons)
self._send_message(message_type, args, expects_response)
def _rpc_request(self, message, expects_response=False):
"""Publish an RPC request to redis, and await response.
:rtype: data returned by PluginInstance function called by RPC.
"""
pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
function_name = message['function']
if 'response_channel' in message:
response_channel = message['response_channel']
pubsub.subscribe(response_channel)
message_pickle = pickle.dumps(message)
Logs.message(f"Sending {function_name} Request to Redis Channel {self.channel}")
self.redis.publish(self.channel, message_pickle)
# Start polling for response message.
start_time = time.time()
while expects_response:
message = pubsub.get_message()
if time.time() >= start_time + NTS_RESPONSE_TIMEOUT:
raise TimeoutError(f"Timeout waiting for response from RPC {function_name}")
if not message:
continue
if message.get('type') == 'message':
Logs.message(f"Response received on channel {response_channel}")
pickled_message_data = message['data']
pubsub.unsubscribe()
message_data = pickle.loads(pickled_message_data)
return message_data
[docs] def upload_shapes(self, shape_list: List[shapes.Shape]):
"""Upload a list of shapes to the server.
:arg: shape_list: List of shapes to upload.
:rtype: list. List of shape IDs.
"""
message_type = Messages.set_shape
expects_response = True
args = shape_list
shape_indices, _bytes = self._send_message(message_type, args, expects_response)
for shape, shape_index in zip(shape_list, shape_indices):
shape._index = shape_index
return shape_list
[docs] def get_plugin_data(self) -> dict:
"""Custom function to get data necessary for serialization from the remote plugin."""
function_name = 'get_plugin_data'
expects_response = True
message = self._build_message(function_name, None, None, expects_response)
response = self._rpc_request(message, expects_response=expects_response)
return response
def _build_packet(self, message_type, args=None, expects_response=False):
serializer = CommandMessageSerializer()
request_id = random_request_id()
message = serializer.serialize_message(request_id, message_type, args, self.version_table, expects_response)
packet = Packet()
packet.set(self.session_id, Packet.packet_type_message_to_client, self.plugin_id)
packet.write(message)
return request_id, packet
[docs] def update_content(self, *content: ui_content) -> None:
"""Update specific UI elements (button, slider, list...)
"""
message_type = Messages.content_update
expects_response = False
args = list(content)
self._send_message(message_type, args, expects_response)
[docs] def update_node(self, *nodes: List[ui.LayoutNode]) -> None:
"""Updates layout nodes and their children."""
message_type = Messages.node_update
expects_response = False
args = nodes
self._send_message(message_type, args, expects_response)
[docs] def update_stream(self, stream: Stream, data: List):
"""Feed new data into an existing stream."""
message_type = Messages.stream_feed
expects_response = True
stream_id = stream.id
data_type = stream.data_type
args = [stream_id, data, data_type]
response = self._send_message(message_type, args, expects_response)
return response
[docs] def destroy_stream(self, stream: Stream):
"""Delete an existing stream."""
message_type = Messages.stream_destroy
expects_response = False
args = stream.id
self._send_message(message_type, args, expects_response)
[docs] def send_files_to_load(self, files_list: List[str]) -> LoadInfoDone:
"""Send a molecular structure file (PDB, SDF, etc.) to the workspace to render."""
message_type = Messages.load_file
expects_response = True
files_and_data = []
for file in files_list:
if isinstance(file, tuple):
full_path, file_name = file
file_name += '.' + full_path.split('.')[-1]
else:
full_path = file.replace('\\', '/')
file_name = full_path.split('/')[-1]
with open(full_path, 'rb') as content_file:
data = content_file.read()
files_and_data.append((file_name, data))
fn_args = (files_and_data, True, True) # idk what the True, True does
response = self._send_message(message_type, fn_args, expects_response)
return response
@staticmethod
def _build_message(function_name, request_id, packet=None, expects_response=False):
response_channel = str(uuid.uuid4())
message = {
'function': function_name,
'request_id': request_id
}
if packet:
pack = packet.pack()
message['packet'] = pack
if expects_response:
message['response_channel'] = response_channel
return message
def _deserialize_payload(self, payload: bytearray):
serializer = CommandMessageSerializer()
received_obj_list, command_hash, request_id = serializer.deserialize_command(
payload, self.version_table)
return received_obj_list