Source code for n2k.utils

from __future__ import annotations

import math
from time import time
from typing import TypeVar

from n2k import constants
from n2k.types import N2kBinaryStatus, N2kOnOff


[docs] def millis() -> int: return int(time() * 1000)
[docs] class IntRef: """ A reference to an integer value. This class is useful for creating a mutable integer object that can be passed around and modified in place. Implements basic arithmetic operations to allow for easy manipulation of the integer value. """ value: int
[docs] def __init__(self, value: int = 0) -> None: self.value = value
def __add__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return self.value + other.value if isinstance(other, int): return self.value + other raise TypeError( "unsupported operand type(s) for +: 'IntRef' and '" + type(other).__name__ + "'", ) def __radd__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return other.value + self.value if isinstance(other, int): return other + self.value raise TypeError( "unsupported operand type(s) for +: '" + type(other).__name__ + "' and 'IntRef'", ) def __sub__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return self.value - other.value if isinstance(other, int): return self.value - other raise TypeError( "unsupported operand type(s) for -: 'IntRef' and '" + type(other).__name__ + "'", ) def __rsub__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return other.value - self.value if isinstance(other, int): return other - self.value raise TypeError( "unsupported operand type(s) for +: '" + type(other).__name__ + "' and 'IntRef'", ) def __mult__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return self.value * other.value if isinstance(other, int): return self.value * other raise TypeError( "unsupported operand type(s) for *: 'IntRef' and '" + type(other).__name__ + "'", ) def __floordiv__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return self.value // other.value if isinstance(other, int): return self.value // other raise TypeError( "unsupported operand type(s) for //: 'IntRef' and '" + type(other).__name__ + "'", ) def __rfloordiv__(self, other: int | IntRef) -> int: if isinstance(other, IntRef): return other.value // self.value if isinstance(other, int): return other // self.value raise TypeError( "unsupported operand type(s) for +: '" + type(other).__name__ + "' and 'IntRef'", ) def __truediv__(self, other: int | IntRef) -> float: if isinstance(other, IntRef): return self.value / other.value if isinstance(other, int): return self.value / other raise TypeError( "unsupported operand type(s) for /: 'IntRef' and '" + type(other).__name__ + "'", ) def __rtruediv__(self, other: int | IntRef) -> float: if isinstance(other, IntRef): return other.value / self.value if isinstance(other, int): return other / self.value raise TypeError( "unsupported operand type(s) for +: '" + type(other).__name__ + "' and 'IntRef'", ) def __repr__(self) -> str: return "IntRef(" + str(self.value) + ")" def __str__(self) -> str: return str(self.value) def __int__(self) -> int: return self.value
[docs] def clamp_int(min_val: int, val: int, max_val: int) -> int: return int(min(max_val, max(min_val, val)))
[docs] def n2k_double_is_na(v: float) -> bool: return v == constants.N2K_DOUBLE_NA
[docs] def n2k_float_is_na(v: float) -> bool: return v == constants.N2K_FLOAT_NA
[docs] def n2k_uint8_is_na(v: float) -> bool: return v == constants.N2K_UINT8_NA
[docs] def n2k_int8_is_na(v: float) -> bool: return v == constants.N2K_INT8_NA
[docs] def n2k_uint16_is_na(v: float) -> bool: return v == constants.N2K_UINT16_NA
[docs] def n2k_int16_is_na(v: float) -> bool: return v == constants.N2K_INT16_NA
[docs] def n2k_uint32_is_na(v: float) -> bool: return v == constants.N2K_UINT32_NA
[docs] def n2k_int32_is_na(v: float) -> bool: return v == constants.N2K_INT32_NA
[docs] def n2k_uint64_is_na(v: float) -> bool: return v == constants.N2K_UINT64_NA
[docs] def n2k_int64_is_na(v: float) -> bool: return v == constants.N2K_INT64_NA
[docs] def rad_to_deg(v: float) -> float: if n2k_double_is_na(v): return v return math.degrees(v)
[docs] def deg_to_rad(v: float) -> float: if n2k_double_is_na(v): return v return math.radians(v)
[docs] def c_to_kelvin(v: float) -> float: if n2k_double_is_na(v): return v return v + 273.15
[docs] def kelvin_to_c(v: float) -> float: if n2k_double_is_na(v): return v return v - 273.15
[docs] def f_to_kelvin(v: float) -> float: if n2k_double_is_na(v): return v return (v - 32) * 5.0 / 9.0 + 273.15
[docs] def kelvin_to_f(v: float) -> float: if n2k_double_is_na(v): return v return (v - 273.15) * 9.0 / 4.0 + 32
[docs] def mbar_to_pascal(v: float) -> float: if n2k_double_is_na(v): return v return v * 100
[docs] def pascal_to_mbar(v: float) -> float: if n2k_double_is_na(v): return v return v / 1000
[docs] def hpa_to_pascal(v: float) -> float: return mbar_to_pascal(v)
[docs] def pascal_to_hpa(v: float) -> float: return pascal_to_mbar(v)
[docs] def ah_to_coulomb(v: float) -> float: if n2k_double_is_na(v): return v return v * 3600
[docs] def coulomb_to_ah(v: float) -> float: if n2k_double_is_na(v): return v return v / 3600
[docs] def hours_to_seconds(v: float) -> float: if n2k_double_is_na(v): return v return v * 3600
[docs] def seconds_to_hours(v: float) -> float: if n2k_double_is_na(v): return v return v / 3600
[docs] def meters_per_second_to_knots(v: float) -> float: if n2k_double_is_na(v): return v return v * 3600 / 1852
[docs] def knots_to_meters_per_second(v: float) -> float: if n2k_double_is_na(v): return v return v * 1852 / 3600
[docs] def n2k_reset_binary_status() -> int: """ Reset all single binary status values to not available This helper function returns a new fully reset 64bit bank status. For each individual item the status will be 3 (0b11 - Unavailable :py:class:`N2kOnOff`) """ return 0xFFFFFFFFFFFFFFFF
[docs] def n2k_get_status_on_binary_status( bank_status: N2kBinaryStatus, item_index: int = 1, ) -> N2kOnOff: """ Get single status of full binary bank status returned by :py:func:`n2k.messages.parse_n2k_binary_status_report`. :param bank_status: Full bank status read by :py:func:`n2k.messages.parse_n2k_binary_status_report` :param item_index: Status item index 1-28 :return: single status of full binary bank status """ if item_index > constants.MAX_BINARY_STATUS_ENTRIES: return N2kOnOff.Unavailable item_index -= 1 return N2kOnOff((bank_status >> (2 * item_index)) & 0x03)
[docs] def n2k_set_status_binary_on_status( bank_status: N2kBinaryStatus, item_status: N2kOnOff, item_index: int = 1, ) -> N2kBinaryStatus: """ Set single status to full binary bank status. :param bank_status: Existing Bank Status :param item_status: New Item Status :param item_index: Index of Item to be changed :return: New Bank Status """ if item_index > constants.MAX_BINARY_STATUS_ENTRIES: # TODO: log warning return bank_status item_index -= 1 mask = ~(0b11 << (2 * item_index)) return (bank_status & mask) | item_status << (2 * item_index)
T = TypeVar("T")
[docs] def with_fallback(v: T | None, fallback: T) -> T: if v is None: return fallback return v