# Used for e.g. configuring devices
# https://www.nmea.org/Assets/20140109%20nmea-2000-corrigendum-tc201401031%20pgn%20126208.pdf
from enum import IntEnum
from typing import TYPE_CHECKING
from n2k.message import Message
from n2k.n2k import PGN
if TYPE_CHECKING:
import n2k.node
[docs]
class N2kGroupFunctionCode(IntEnum):
Request = 0
Command = 1
Acknowledge = 2
Read = 3
ReadReply = 4
Write = 5
WriteReply = 6
[docs]
class N2kGroupFunctionPGNErrorCode(IntEnum):
Acknowledge = 0
PGNNotSupported = 1
PGNTemporarilyNotAvailable = 2
AccessDenied = 3
RequestOrCommandNotSupported = 4
DefinerTagNotSupported = 5
ReadOrWriteNotSupported = 6
[docs]
class N2kGroupFunctionTransmissionOrPriorityErrorCode(IntEnum):
Acknowledge = 0
TransmitIntervalOrPriorityNotSupported = 1
TransmitIntervalIsLessThanMeasurementInterval = 2
AccessDenied = 3
RequestNotSupported = 4
[docs]
class N2kGroupFunctionParameterErrorCode(IntEnum):
Acknowledge = 0
InvalidRequestOrCommandParameterField = 1
TemporarilyUnableToComply = 2
RequestOrCommandParameterOutOfRange = 3
AccessDenied = 4
RequestOrCommandNotSupported = 5
ReadOrWriteIsNotSupported = 6
[docs]
def match_request_field(field_val: int, match_val: int, mask: int) -> tuple[bool, int]:
if field_val & mask == match_val:
return True, N2kGroupFunctionParameterErrorCode.Acknowledge
return (
False,
N2kGroupFunctionParameterErrorCode.RequestOrCommandParameterOutOfRange,
)
[docs]
def match_request_field_str(field_val: str, match_val: str) -> tuple[bool, int]:
match = field_val == match_val
if match:
return True, N2kGroupFunctionParameterErrorCode.Acknowledge
return False, N2kGroupFunctionParameterErrorCode.RequestOrCommandParameterOutOfRange
[docs]
class N2kGroupFunctionHandler:
_pgn: int
proprietary: bool
n2k_node: "n2k.node.Node"
[docs]
def _get_request_group_function_transmission_or_priority_error_code(
self,
transmission_interval: int,
) -> N2kGroupFunctionTransmissionOrPriorityErrorCode:
raise NotImplementedError
[docs]
def _handle_request(
self,
msg: Message,
transmission_interval: int,
transmission_interval_offset: int,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def _handle_command(
self,
msg: Message,
priority_setting: int,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def _handle_acknowledge(
self,
msg: Message,
pgn_error_code: N2kGroupFunctionPGNErrorCode,
transmission_or_priority_error_code: N2kGroupFunctionTransmissionOrPriorityErrorCode,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def _handle_read_fields(
self,
msg: Message,
manufacturer_code: int,
industry_group: int,
unique_id: int,
number_of_selection_pairs: int,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def _handle_read_fields_reply(self, msg: Message) -> bool:
raise NotImplementedError
[docs]
def _handle_write_fields(
self,
msg: Message,
manufacturer_code: int,
industry_group: int,
unique_id: int,
number_of_selection_pairs: int,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def _handle_write_fields_reply(self, msg: Message) -> bool:
raise NotImplementedError
[docs]
def __init__(self, n2k_node: "n2k.node.Node", pgn: int) -> None:
self.n2k_node = n2k_node
self._pgn = pgn
[docs]
def handle(
self,
msg: Message,
group_function_code: N2kGroupFunctionCode,
pgn_for_group_function: int,
) -> bool:
raise NotImplementedError
[docs]
def get_pgn_for_group_function(msg: Message) -> int:
raise NotImplementedError
[docs]
def parse(
msg: Message,
group_function_code: N2kGroupFunctionCode,
pgn_for_group_function: PGN,
) -> bool:
raise NotImplementedError
[docs]
def parse_request_params(
msg: Message,
transmission_interval: int,
transmission_interval_offset: int,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def start_parse_request_pair_parameters(msg: Message, index: int) -> bool:
raise NotImplementedError
[docs]
def parse_command_params(
msg: Message,
priority_setting: int,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def start_parse_command_pair_parameters(msg: Message, index: int) -> bool:
raise NotImplementedError
[docs]
def parse_acknowledge_params(
msg: Message,
pgn_error_code: N2kGroupFunctionPGNErrorCode,
transmission_or_priority_error_code: N2kGroupFunctionTransmissionOrPriorityErrorCode,
number_of_parameter_pairs: int,
) -> bool:
raise NotImplementedError
[docs]
def start_parse_read_or_write_parameters(msg: Message, index: int) -> bool:
raise NotImplementedError
[docs]
def parse_read_or_write_params(
msg: Message,
manufacturer_code: int,
industry_group: int,
unique_id: int,
number_of_selection_pairs: int,
number_of_parameter_pairs: int,
proprietary: bool = False,
) -> bool:
raise NotImplementedError
[docs]
def set_start_read_reply(
msg: Message,
destination: int,
pgn: int,
manufacturer_code: int,
industry_group: int,
unique_id: int,
number_of_selection_pairs: int,
number_of_parameter_pairs: int,
proprietary: bool,
) -> None:
print("NotImplemented set_start_read_reply")
[docs]
def set_start_write_reply(
msg: Message,
destination: int,
pgn: int,
manufacturer_code: int,
industry_group: int,
unique_id: int,
number_of_selection_pairs: int,
number_of_parameter_pairs: int,
proprietary: bool,
) -> None:
print("NotImplemented set_start_write_reply")
[docs]
def set_start_acknowledge(
msg: Message,
destination: int,
pgn: int,
pgn_error_code: N2kGroupFunctionPGNErrorCode,
transmission_or_priority_error_code: N2kGroupFunctionTransmissionOrPriorityErrorCode,
number_of_parameter_pairs: int = 0,
) -> None:
print("NotImplemented set_start_acknowledge")
[docs]
def change_pgn_error_code(
msg: Message,
pgn_error_code: N2kGroupFunctionPGNErrorCode,
) -> None:
print("NotImplemented change_pgn_error_code")
[docs]
def change_transmission_or_priority_error_code(
msg: Message,
transmission_or_priority_error_code: N2kGroupFunctionTransmissionOrPriorityErrorCode,
) -> None:
print("NotImplemented change_transmission_or_priority_error_code")
[docs]
def add_acknowledge_parameter(
msg: Message,
parameter_pair_index: int,
error_code: N2kGroupFunctionParameterErrorCode = N2kGroupFunctionParameterErrorCode.ReadOrWriteIsNotSupported,
) -> None:
print("NotImplemented add_acknowledge_parameter")
[docs]
def send_acknowledge(
node: "n2k.node.Node",
destination: int,
pgn: int,
pgn_error_code: N2kGroupFunctionPGNErrorCode,
transmission_or_priority_error_code: N2kGroupFunctionTransmissionOrPriorityErrorCode,
number_of_parameter_pairs: int = 0,
parameter_error_code_for_all: N2kGroupFunctionParameterErrorCode = N2kGroupFunctionParameterErrorCode.Acknowledge,
) -> None:
print("NotImplemented send_acknowledge")