Source code for grpc4bmi.bmi_client_singularity

import errno
import os
from os.path import abspath
import subprocess
import sys
import logging

from semver import VersionInfo

from grpc4bmi.bmi_grpc_client import BmiClient
from grpc4bmi.utils import stage_config_file

REQUIRED_SINGULARITY_VERSION = '>=3.1.0'


[docs]def check_singularity_version(): p = subprocess.Popen(['singularity', 'version'], stdout=subprocess.PIPE) (stdout, _stderr) = p.communicate() if p.returncode != 0: raise Exception('Unable to determine singularity version') if not VersionInfo.parse(stdout.decode('utf-8')).match(REQUIRED_SINGULARITY_VERSION): raise Exception(f'Wrong version of singularity found, require version {REQUIRED_SINGULARITY_VERSION}') return True
[docs]class BmiClientSingularity(BmiClient): """BMI GRPC client for singularity server processes During initialization launches a singularity container with run-bmi-server as its command. The client picks a random port and expects the container to run the server on that port. The port is passed to the container using the BMI_PORT environment variable. >>> from grpc4bmi.bmi_client_singularity import BmiClientSingularity >>> image = 'docker://ewatercycle/wflow-grpc4bmi:latest' >>> client = BmiClientSingularity(image, input_dir='wflow_rhine_sbm', output_dir='wflow_output') >>> client.initialize('wflow_rhine_sbm/wflow_sbm.ini') >>> client.update_until(client.get_end_time()) >>> del client Args: image: Singularity image. For Docker Hub image use `docker://*`. input_dir (str): Directory for input files of model output_dir (str): Directory for input files of model timeout (int): Seconds to wait for gRPC client to connect to server extra_volumes (Dict[str,str]): Extra volumes to attach to Singularity container. The key is the hosts path and the value the mounted volume inside the container. Contrary to Docker client, extra volumes are always read/write For example: .. code-block:: python {'/data/shared/forcings/': '/data/forcings'} """ INPUT_MOUNT_POINT = "/data/input" OUTPUT_MOUNT_POINT = "/data/output" def __init__(self, image, input_dir=None, output_dir=None, timeout=None, extra_volumes=None): check_singularity_version() host = 'localhost' port = BmiClient.get_unique_port(host) args = [ "singularity", "run", ] mount_points = {} if extra_volumes is None else extra_volumes if input_dir is not None: mount_points[input_dir] = BmiClientSingularity.INPUT_MOUNT_POINT self.input_dir = abspath(input_dir) if any(mount_points): args += ["--bind", ','.join([hp + ':' + ip for hp, ip in mount_points.items()])] if output_dir is not None: self.output_dir = abspath(output_dir) try: # Create output dir ourselves or singularity will complain os.mkdir(self.output_dir) except OSError as e: if e.errno != errno.EEXIST: raise e args += ["--bind", output_dir + ':' + BmiClientSingularity.OUTPUT_MOUNT_POINT] args.append(image) env = os.environ.copy() env['BMI_PORT'] = str(port) logging.info(f'Running {image} singularity container on port {port}') self.container = subprocess.Popen(args, env=env, preexec_fn=os.setsid) super(BmiClientSingularity, self).__init__(BmiClient.create_grpc_channel(port=port, host=host), timeout=timeout) def __del__(self): if hasattr(self, "container"): self.container.terminate() self.container.wait()
[docs] def initialize(self, filename): fn = stage_config_file(filename, self.input_dir, self.INPUT_MOUNT_POINT, home_mounted=True) super(BmiClientSingularity, self).initialize(fn)
[docs] def get_value_ptr(self, var_name): raise NotImplementedError("Cannot exchange memory references across process boundary")