fix dictionary typo, attempt ARS response

This commit is contained in:
KF7EEL 2022-03-06 19:20:38 -08:00
parent 631ca7255c
commit 3998456d37
1 changed files with 59 additions and 19 deletions

View File

@ -1293,6 +1293,48 @@ def send_sms(csbk, to_id, from_id, peer_id, call_type, msg, snd_slot = 1):
## send_to_user(to_id, snd_seq_lst, int(slot), call_type = 'unit')
# Experimental ARS functions
def ars_resp(msg, to_id, from_id, call_type, use_header = True):
## msg_bytes = str.encode(msg)
## encoded = "".join([str('00' + x) for x in re.findall('..',bytes.hex(msg_bytes))] )
## final = encoded
call_seq_num = gen_sms_seq()
# Convert DMR ID to integer, the IP for Scapy
# 0c adds 12. to the IP address, refferred as the CAI
hex_2_ip_src = str('0d' + from_id)
hex_2_ip_dest = str('0c' + to_id)
src_dmr_id = str(ipaddress.IPv4Address(int(hex_2_ip_src, 16)))
dst_dmr_id = str(ipaddress.IPv4Address(int(hex_2_ip_dest, 16)))
## # Unknown what byte is for, but it does correlate to the charaters : (number of characters + 4) * 2 . Convert to bytes.
## unk_count = int((len(msg) + 4) * 2).to_bytes(1, 'big')
hdr_seq_num = (ahex(int((int(call_seq_num, 16) + 128)).to_bytes(1, 'big')))
## sms_header = '00' + str(ahex(unk_count))[2:-1] + 'a000' + str(hdr_seq_num)[2:-1] + '040d000a'
## ip_udp = packet = IP(dst=dst_dmr_id, src=src_dmr_id)/UDP(sport=4007, dport=4007)/(bytes.fromhex('0012A00083040D000A') + msg.encode('utf-16') + bytes.fromhex('0000000000000000000000'))
ip_udp = IP(dst=dst_dmr_id, src=src_dmr_id, id=int(call_seq_num, 16), tos = 1)/UDP(sport=4005, dport=4005)/bytes.fromhex(msg)# + bytes.fromhex('0000000000000000000000'))
header_bits = btf_poc(str(ahex(raw(ip_udp)))[2:-1])
sms_complete_header = create_crc16(gen_header2(to_id, from_id, call_type, header_bits[1], header_bits[0]))
logger.debug(sms_complete_header)
# Return corrected fragment with BTF and generate CRC16 with header
return create_crc32(header_bits[2]), sms_complete_header #str(ahex(raw(ip_udp)))[2:-1]
# Take fully formatted list of sequenced MMDVM packets and send
def send_to_user(to_id, seq_lst, slot, call_type = 'unit'):
if call_type == 'unit':
@ -1546,6 +1588,7 @@ def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _fr
# Use block 0 as trigger. $GPRMC must also be in string to indicate NMEA.
# This triggers the APRS upload
if btf == 0:
print(ahex(_rf_src))
final_packet = str(bitarray(re.sub("\)|\(|bitarray|'", '', packet_assembly)).tobytes().decode('utf-8', 'ignore'))
sms_hex = str(ba2hx(bitarray(re.sub("\)|\(|bitarray|'", '', packet_assembly))))
sms_hex_string = re.sub("b'|'", '', str(sms_hex))
@ -1559,25 +1602,23 @@ def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _fr
logger.debug('Hexadecimal data: ' + sms_hex_string)
if '0fa50fa5' in sms_hex_string and '4011' in sms_hex_string[:20]:
logger.info('Motorola ARS packet detected')
logger.info('Protocol not supported yet.')
logger.info('Protocol support in progress')
if '000cf0' in sms_hex_string:
logger.info('Received ARS "hello"')
## print('attempt response')
## #create_sms_seq(dst_id, src_id, peer_id, _slot, _call_type, dmr_string)
#### h_string = '4501002a04300000401159a00c23e0440d00238b0fa50fa50016308e000cf02007323335313137320000000015a36e7b'
#### h_string = '4501002a04300000401159a00c23e0440d00238b0fa50fa50016308e000cf02007323335313137320000000000000000'
#### print(sms_hex_string[:-8])
#### print('gen - ' + str(create_crc32(sms_hex_string[:-8])))
#### print('format sms - ' + str(format_sms(sms_hex_string[:-8], (int_id(_rf_src)), 9099, False)))
## print('---')
## #024200238b23e0448400a14c
## #024223E04400238B8400
### tst_resp = '4501002a065300004011577d0d00238b0c23e0440fa50fa50016308e0002bf0107323335313137320000000000000000'
## tst_resp = '4501002a065300004011577d0d00238b0c23e0440fa50fa50016308e0002bf01'
## ars_seq = create_sms_seq(int_id(_rf_src), 9099, 9099, _slot, 1, create_crc16_csbk('024223E04400238B8400') + create_crc32(tst_resp)) #sms_hex_string[:-8]))
## print()
## print(ars_seq)
## send_to_user(int_id(_rf_src), ars_seq, _slot, _call_type)
logger.debug('Attempt response')
ars_response = (ars_resp('0002BF01', str(ahex(_rf_src))[2:-1], str(ahex(_dst_id))[2:-1], 1))
ars_snd = create_sms_seq(int_id(_rf_src), int_id(_dst_id), int_id(_dst_id), _slot, 1, ars_response[1] + ars_response[0])
print(ars_snd)
for i in ars_snd:
print(ahex(bptc_decode(i)))
systems[UNIT_MAP[_rf_src][0]].send_system(i)
else:
#NMEA GPS sentence
if '$GPRMC' in final_packet or '$GNRMC' in final_packet:
@ -1745,8 +1786,7 @@ class OBP(OPENBRIDGE):
def dmrd_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _frame_type, _dtype_vseq, _stream_id, _data):
## UNIT_MAP[_rf_src] = (self._system, time())
UNIT_MAP[_rf_src] = (_seq, time())
UNIT_MAP[_rf_src] = (self._system, time())
if _rf_src not in PACKET_MATCH:
PACKET_MATCH[_rf_src] = [_seq, time()]