Node

class n2k.node.Node(bus: BusABC, device_information: DeviceInformation, can_msg_buffer_size: int = 20) None[source]

Bases: Listener

n2k_source: int = 0
product_information: ProductInformation = ProductInformation(n2k_version=2101, product_code=666, n2k_model_id='', n2k_sw_code='', n2k_model_version='', n2k_model_serial_code='', certification_level=0, load_equivalency=1)
manufacturer_serial_code: str = ''
pending_iso_address_claim: int | None = None
pending_product_information: int | None = None
pending_configuration_information: int | None = None
address_claim_started: int = 0
address_claim_end_source: int = 251
heartbeat_interval: int
default_heartbeat_interval: int
next_heartbeat_send_time: int
set_pending_iso_address_claim(from_now: int = 2) None[source]
Return type:

None

clear_pending_iso_address_claim() None[source]
Return type:

None

query_pending_iso_address_claim() bool[source]
Return type:

bool

set_pending_product_information() None[source]
Return type:

None

clear_pending_product_information() None[source]
Return type:

None

query_pending_product_information() bool[source]
Return type:

bool

set_pending_configuration_information() None[source]
Return type:

None

clear_pending_configuration_information() None[source]
Return type:

None

query_pending_configuration_information() bool[source]
Return type:

bool

update_address_claim_end_source() None[source]
Return type:

None

__init__(bus: BusABC, device_information: DeviceInformation, can_msg_buffer_size: int = 20) None[source]
bus: BusABC
sending_lock: allocate_lock
device_list: DeviceList
can_msg_buffer: N2kCANMessageBuffer
transmit_messages: list[int]
receive_messages: list[int]
pgn_sequence_counters: dict[int, int] | None = None
device_information: DeviceInformation
on_message_received(msg: Message) None[source]

This method is called to handle the given message.

Parameters:

msg (Message) – the delivered message

Return type:

None

on_error(exc: Exception) None[source]

This method is called to handle any exception in the receive thread.

Parameters:

exc (Exception) – The exception causing the thread to stop

Return type:

None

set_product_information(name: str, firmware_version: str, model_version: str, model_serial_code: str, load_equivalency: int = 1, certification_level: int = 0, product_code: int = 666) None[source]

Set the product information. This will be visible in e.g. multi function displays when viewing the device list. All strings have a maximum length of MAX_N2K_PRODUCT_INFO_STR_LEN

Parameters:
  • name (str) – The Name / Model ID

  • firmware_version (str) – The Firmware Version

  • model_version (str) – The Model Version / Revision, e.g. ‘1.0’

  • model_serial_code (str) – The Model Serial Number

  • load_equivalency (int, default: 1) – The Load Equivalency Number specifies the power draw of the device. To get it simply take the estimated current draw in mA @12V and divide it by 50 (and round up)

  • certification_level (int, default: 0)

  • product_code (int, default: 666) – The Product Code granted by the NMEA. Using 666 by default for Open Source projects.

Return type:

None

set_configuration_information(manufacturer_information: str = 'NMEA2000 Library https://github.com/finnboeger/NMEA2000', installation_description1: str = '', installation_description2: str = '') None[source]

Set the configuration Information. All strings have a maximum length of Max_N2K_CONFIGURATION_INFO_FIELD_LEN

Parameters:
  • manufacturer_information (str, default: 'NMEA2000 Library https://github.com/finnboeger/NMEA2000')

  • installation_description1 (str, default: '')

  • installation_description2 (str, default: '')

Return type:

None

set_device_information(unique_number: int, device_function: int = 130, device_class: int = 25, manufacturer_code: int = 2046) None[source]

Set the device information.

Parameters:
  • unique_number (int) – 21bit large number (max. 2097151), each device from the same manufacturer should have a different unique number

  • device_function (int, default: 130) – Defaults to 130 (with Device Class: 25 => PC Gateway). Device Codes can be found here

  • device_class (int, default: 25) – Defaults to 25 (Inter-/Intranet Device). See above for codes.

  • manufacturer_code (int, default: 2046) – Maximum of 2046. Has to be bought from the NMEA. List of registered codes

Return type:

None

message_handlers: set[MessageHandler]
address_changed_callback: Callable[[Node], None]
device_information_changed_callback: Callable[[Node], None]
address_changed: bool = False
device_information_changed: bool = False
configuration_information: ConfigurationInformation = ConfigurationInformation(manufacturer_information='', installation_description1='', installation_description2='')
custom_single_frame_messages: list[int] | None = None
custom_fast_packet_messages: list[int] | None = None
_n2k_can_msg_buf: list[Message]
_max_n2k_can_msgs: int
_can_send_frame_buf: deque[Message]
_max_can_send_frames: int = 40
_can_send_frame_buffer_write: int
_can_send_frame_buffer_read: int
_max_can_receive_frames: int
_request_handler: Callable[[int, int], bool] | None = None
_send_frames() bool[source]
Return type:

bool

_send_frame(frame_id: int, length: int, buf: bytearray) bool[source]
Return type:

bool

_get_next_free_can_send_frame() CANSendFrame[source]
Return type:

CANSendFrame

_send_pending_information() None[source]
Return type:

None

_is_initialized() bool[source]
Return type:

bool

_find_free_can_msg_index(pgn: int, source: int, destination: int, msg_index: int) None[source]
Return type:

None

_create_n2k_can_buf_msg_message(can_id: int, length: int, buf: bytearray) Message | None[source]
Return type:

Message | None

_is_fast_packet_pgn(pgn: int) bool[source]
Return type:

bool

_is_fast_packet(msg: Message) bool[source]
Return type:

bool

_check_known_message(pgn: int) tuple[bool, bool, bool][source]
Return type:

tuple[bool, bool, bool]

_handle_received_system_message(msg: N2kCANMessage) bool[source]
Return type:

bool

_respond_iso_request(msg: Message, requested_pgn: int) None[source]
Return type:

None

_handle_iso_request(msg: Message) None[source]
Return type:

None

_start_address_claim() None[source]
Return type:

None

_is_address_claim_started() bool[source]
Return type:

bool

_handle_iso_address_claim(msg: Message) None[source]
Return type:

None

_handle_commanded_address(msg: Message) None[source]
Return type:

None

_get_next_address(restart_at_end: bool = False) None[source]
Return type:

None

_get_sequence_counter(pgn: int) int[source]
Return type:

int

_get_fast_packet_tx_pgn_count() int[source]
Return type:

int

_run_message_handlers(msg: Message) None[source]
Return type:

None

create_n2k_can_msg_buf_size_message(max_n2k_can_msgs: int) None[source]
Return type:

None

create_n2k_can_send_frame_buf_size_message(max_can_send_frames: int) None[source]
Return type:

None

create_n2k_can_receive_frame_buf_size_message(max_can_receive_frames: int) None[source]
Return type:

None

send_iso_address_claim(destination: int = 255, delay: int = 0) None[source]

Some Devices (Garmin) constantly request information about others on the network. Thus, we need to re-send our address claim, or they will stop detecting us. :type destination: int, default: 255 :param destination: :type delay: int, default: 0 :param delay: :rtype: None :return:

_abc_impl = <_abc._abc_data object>
send_tx_pgn_list(destination: int) None[source]
Return type:

None

send_rx_pgn_list(destination: int) None[source]
Return type:

None

send_product_information(destination: int = 255) bool[source]
Return type:

bool

send_configuration_information(destination: int = 255) bool[source]
Return type:

bool

send_heartbeat(force: bool = False) bool[source]
Return type:

bool

send_msg(msg: Message) bool[source]
Return type:

bool

attach_msg_handler(msg_handler: MessageHandler) None[source]
Return type:

None

detach_msg_handler(msg_handler: MessageHandler) None[source]
Return type:

None

set_iso_request_handler(request_handler: Callable[[int, int], bool]) None[source]
Return type:

None

remove_iso_request_handler() None[source]
Return type:

None