import logging
import traceback
import numpy
from google.protobuf import any_pb2
from google.rpc import error_details_pb2, status_pb2, code_pb2
from grpc_status import rpc_status
from . import bmi_pb2, bmi_pb2_grpc
log = logging.getLogger(__name__)
[docs]
class BmiLegacyServer02(bmi_pb2_grpc.BmiServiceServicer):
"""
BMI Server class, wrapping an existing python implementation and exposing it via GRPC across the memory space (to
listening client processes). The class takes a package, module and class name and instantiates the BMI
implementation by assuming a default constructor with no arguments.
For models implementing the bmi interface defined https://pypi.org/project/basic-modeling-interface/0.2/
Args:
model: Bmi model object which must be wrapped by grpc
debug: If true then returns stacktrace in an error response.
The stacktrace is returned in the trailing metadata as a DebugInfo (https://github.com/googleapis/googleapis/blob/07244bb797ddd6e0c1c15b02b4467a9a5729299f/google/rpc/error_details.proto#L46-L52) message.
"""
def __init__(self, model, debug=False):
super(bmi_pb2_grpc.BmiServiceServicer, self).__init__()
self.bmi_model_ = model
self.debug = debug
[docs]
def exception_handler(self, exc, context):
log.exception(exc)
detail = any_pb2.Any()
if self.debug:
detail.Pack(
error_details_pb2.DebugInfo(
stack_entries=traceback.format_stack(),
detail=repr(exc)
)
)
status = status_pb2.Status(
code=code_pb2.INTERNAL,
message=str(exc),
details=[detail]
)
context.abort_with_status(rpc_status.to_status(status))
[docs]
def initialize(self, request, context):
ifile = str(request.config_file)
if not ifile:
ifile = None
try:
self.bmi_model_.initialize(ifile)
return bmi_pb2.Empty()
except Exception as e:
self.exception_handler(e, context)
[docs]
def update(self, request, context):
try:
self.bmi_model_.update()
return bmi_pb2.Empty()
except Exception as e:
self.exception_handler(e, context)
[docs]
def finalize(self, request, context):
try:
self.bmi_model_.finalize()
return bmi_pb2.Empty()
except Exception as e:
self.exception_handler(e, context)
[docs]
def getComponentName(self, request, context):
try:
return bmi_pb2.GetComponentNameResponse(name=self.bmi_model_.get_component_name())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getOutputVarNames(self, request, context):
try:
return bmi_pb2.GetVarNamesResponse(names=self.bmi_model_.get_output_var_names())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getTimeUnits(self, request, context):
try:
return bmi_pb2.GetTimeUnitsResponse(units=self.bmi_model_.get_time_units())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getTimeStep(self, request, context):
try:
return bmi_pb2.GetTimeStepResponse(interval=self.bmi_model_.get_time_step())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getCurrentTime(self, request, context):
try:
return bmi_pb2.GetTimeResponse(time=self.bmi_model_.get_current_time())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getStartTime(self, request, context):
try:
return bmi_pb2.GetTimeResponse(time=self.bmi_model_.get_start_time())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getEndTime(self, request, context):
try:
return bmi_pb2.GetTimeResponse(time=self.bmi_model_.get_end_time())
except Exception as e:
self.exception_handler(e, context)
[docs]
def getVarGrid(self, request, context):
try:
return bmi_pb2.GetVarGridResponse(grid_id=self.bmi_model_.get_var_grid(request.name))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getVarType(self, request, context):
try:
return bmi_pb2.GetVarTypeResponse(type=self.bmi_model_.get_var_type(request.name))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getVarItemSize(self, request, context):
try:
return bmi_pb2.GetVarItemSizeResponse(size=self.bmi_model_.get_var_itemsize(request.name))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getVarUnits(self, request, context):
try:
return bmi_pb2.GetVarUnitsResponse(units=self.bmi_model_.get_var_units(request.name))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getVarNBytes(self, request, context):
try:
return bmi_pb2.GetVarNBytesResponse(nbytes=self.bmi_model_.get_var_nbytes(request.name))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getValue(self, request, context):
try:
vals = self.bmi_model_.get_value(request.name)
if vals.dtype == numpy.int32:
return bmi_pb2.GetValueResponse(values_int=bmi_pb2.IntArrayMessage(values=vals.flatten()))
if vals.dtype == numpy.float32:
return bmi_pb2.GetValueResponse(values_float=bmi_pb2.FloatArrayMessage(values=vals.flatten()))
if vals.dtype == numpy.float64:
return bmi_pb2.GetValueResponse(values_double=bmi_pb2.DoubleArrayMessage(values=vals.flatten()))
raise NotImplementedError("Arrays with type %s cannot be transmitted through this GRPC channel" % vals.dtype)
except Exception as e:
self.exception_handler(e, context)
[docs]
def getValuePtr(self, request, context):
raise NotImplementedError("Array references cannot be transmitted through this GRPC channel")
[docs]
def getValueAtIndices(self, request, context):
try:
indices = numpy.array(request.indices)
vals = self.bmi_model_.get_value_at_indices(request.name, indices)
if vals.dtype == numpy.int32:
return bmi_pb2.GetValueAtIndicesResponse(values_int=bmi_pb2.IntArrayMessage(values=vals.flatten()))
if vals.dtype == numpy.float32:
return bmi_pb2.GetValueAtIndicesResponse(values_float=bmi_pb2.FloatArrayMessage(values=vals.flatten()))
if vals.dtype == numpy.float64:
return bmi_pb2.GetValueAtIndicesResponse(values_double=bmi_pb2.DoubleArrayMessage(values=vals.flatten()))
raise NotImplementedError("Arrays with type %s cannot be transmitted through this GRPC channel" % vals.dtype)
except Exception as e:
self.exception_handler(e, context)
[docs]
def setValue(self, request, context):
try:
if request.HasField("values_int"):
self.bmi_model_.set_value(request.name, request.values_int.values)
if request.HasField("values_float"):
self.bmi_model_.set_value(request.name, request.values_float.values)
if request.HasField("values_double"):
self.bmi_model_.set_value(request.name, request.values_double.values)
return bmi_pb2.Empty()
except Exception as e:
self.exception_handler(e, context)
[docs]
def setValueAtIndices(self, request, context):
try:
index_array = numpy.array(request.indices)
if request.HasField("values_int"):
array = numpy.array(request.values_int.values, dtype=numpy.int32)
self.bmi_model_.set_value_at_indices(request.name, index_array, array)
if request.HasField("values_float"):
array = numpy.array(request.values_int.values, dtype=numpy.float32)
self.bmi_model_.set_value_at_indices(request.name, index_array, array)
if request.HasField("values_double"):
array = numpy.array(request.values_double.values, dtype=numpy.float64)
self.bmi_model_.set_value_at_indices(request.name, index_array, array)
return bmi_pb2.Empty()
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridSize(self, request, context):
try:
return bmi_pb2.GetGridSizeResponse(size=self.bmi_model_.get_grid_size(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridRank(self, request, context):
try:
return bmi_pb2.GetGridRankResponse(rank=self.bmi_model_.get_grid_rank(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridType(self, request, context):
try:
return bmi_pb2.GetGridTypeResponse(type=self.bmi_model_.get_grid_type(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridShape(self, request, context):
try:
return bmi_pb2.GetGridShapeResponse(shape=self.bmi_model_.get_grid_shape(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridSpacing(self, request, context):
try:
return bmi_pb2.GetGridSpacingResponse(spacing=self.bmi_model_.get_grid_spacing(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridOrigin(self, request, context):
try:
return bmi_pb2.GetGridOriginResponse(origin=self.bmi_model_.get_grid_origin(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridX(self, request, context):
try:
return bmi_pb2.GetGridPointsResponse(coordinates=self.bmi_model_.get_grid_x(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridY(self, request, context):
try:
return bmi_pb2.GetGridPointsResponse(coordinates=self.bmi_model_.get_grid_y(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
[docs]
def getGridZ(self, request, context):
try:
return bmi_pb2.GetGridPointsResponse(coordinates=self.bmi_model_.get_grid_z(request.grid_id))
except Exception as e:
self.exception_handler(e, context)
def __repr__(self) -> str:
return self.bmi_model_.__repr__()