finish CSBK implementation, confirmed working MD-380 SMS!

This commit is contained in:
KF7EEL 2022-03-10 18:31:07 -08:00
parent 7dd7bd9383
commit 8ef82d0561
1 changed files with 79 additions and 79 deletions

View File

@ -107,7 +107,7 @@ import ipaddress
import paho.mqtt.client as mqtt
import string
import math
#################################
#################################
@ -122,7 +122,7 @@ __maintainer__ = 'Eric Craw, KF7EEL'
__email__ = 'kf7eel@qsl.net'
sms_seq_num = 0
use_csbk = False
use_csbk = True
ssid = ''
UNIT_MAP = {}
PACKET_MATCH = {}
@ -1106,37 +1106,19 @@ def btf_poc(fragment_input):
def create_crc16_csbk(fragment_input):
## print(fragment_input)
crc16_csbk = libscrc.gsm16(bytearray.fromhex(fragment_input))
return fragment_input + re.sub('x', '0', str(hex(crc16_csbk ^ 0xa5a5))[-4:])
def csbk_gen(to_id, from_id, tot_block):
def csbk_gen2(to_id, from_id, tot_block):
csbk_lst = ['BD0080', 'BD0080', 'BD0080', 'BD0080', 'BD0080']
send_seq_list = ''
tot_block = tot_block + 5
for block in csbk_lst:
print(str(ahex(tot_block.to_bytes(1, 'big')))[2:-1])
block = block + str(ahex(tot_block.to_bytes(1, 'big')))[2:-1] + to_id + from_id#str(ahex(int(tot_block))[2:-1])
block = create_crc16_csbk(block)
send_seq_list = send_seq_list + block
tot_block = tot_block - 1
## print(block)
return send_seq_list
def csbk_gen2(to_id, from_id, tot_block):
csbk_lst = ['BD0080', 'BD0080', 'BD0080', 'BD0080', 'BD0080', 'BD0080', 'BD0080', 'BD0080']
send_seq_list = ''
tot_block = tot_block + 8
for block in csbk_lst:
print(str(ahex(tot_block.to_bytes(1, 'big')))[2:-1])
block = block + str(ahex(tot_block.to_bytes(1, 'big')))[2:-1] + to_id + from_id#str(ahex(int(tot_block))[2:-1])
block = create_crc16_csbk(block)
send_seq_list = send_seq_list + block
tot_block = tot_block - 1
## print(block)
return send_seq_list
@ -1192,14 +1174,25 @@ def block_sequence(input_string):
seq_blocks = len(input_string)/24
n = 0
block_seq = []
## csbk_n = 0
csbk_seq = []
while n < seq_blocks:
if n == 0:
block_seq.append(bytes.fromhex(input_string[:24].ljust(24,'0')))
if input_string[:6] == 'BD0080':
csbk_seq.append(bytes.fromhex(input_string[:24].ljust(24,'0')))
## elif input_string[:6] != 'BD0080':
else:
block_seq.append(bytes.fromhex(input_string[:24].ljust(24,'0')))
n = n + 1
else:
block_seq.append(bytes.fromhex(input_string[n*24:n*24+24].ljust(24,'0')))
## print('blok seq ' + str((input_string[n*24:n*24+24])))
if str(input_string[n*24:n*24+24])[:6] == 'BD0080':
csbk_seq.append(bytes.fromhex(input_string[n*24:n*24+24].ljust(24,'0')))
## elif str(input_string[n*24:n*24+24])[:6] != 'BD0080':
else:
block_seq.append(bytes.fromhex(input_string[n*24:n*24+24].ljust(24,'0')))
n = n + 1
return block_seq
return block_seq, csbk_seq
# Takes list of DMR packets, 12 bytes, then encodes them
def dmr_encode(packet_list, _slot):
@ -1225,41 +1218,31 @@ def dmr_encode(packet_list, _slot):
def create_sms_seq(dst_id, src_id, peer_id, _slot, _call_type, dmr_string):
## print('mmdvm_enc')
## print(dmr_string)
rand_seq = random.randint(1, 999999)
block_seq = block_sequence(dmr_string)
dmr_list = dmr_encode(block_seq, _slot)
dmr_list = dmr_encode(block_seq[0], _slot)
csbk_list = dmr_encode(block_seq[1], _slot)
cap_in = 0
mmdvm_send_seq = []
csbk_seq = []
for i in csbk_list:
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 3, rand_seq, i)
csbk_seq.append(the_mmdvm_pkt)
for i in dmr_list:
if use_csbk == True:
if cap_in < 5:
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 3, rand_seq, i)
#print(block_seq[cap_in])
#print(3)
if cap_in == 5:
#print(block_seq[cap_in])
#print(6)
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 6, rand_seq, i) #(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
if cap_in > 5:
#print(block_seq[cap_in])
#print(7)
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 7, rand_seq, i)#(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
mmdvm_send_seq.append(ahex(the_mmdvm_pkt))
cap_in = cap_in + 1
if use_csbk == False:
if cap_in == 0:
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 6, rand_seq, i) #(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
else:
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 7, rand_seq, i)#(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
## mmdvm_send_seq.append(ahex(the_mmdvm_pkt))
mmdvm_send_seq.append(the_mmdvm_pkt)
cap_in = cap_in + 1
if cap_in == 0:
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 6, rand_seq, i) #(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
else:
the_mmdvm_pkt = mmdvm_encapsulate(dst_id, src_id, peer_id, cap_in, _slot, _call_type, 7, rand_seq, i)#(bytes.fromhex(re.sub("b'|'", '', str(orig_cap[cap_in][20:-4])))))
mmdvm_send_seq.append(the_mmdvm_pkt)
cap_in = cap_in + 1
## if CONFIG['WEB_SERVICE']['REMOTE_CONFIG_ENABLED'] == False:
## with open('/tmp/.hblink_data_que_' + str(CONFIG['DATA_CONFIG']['APRS_LOGIN_CALL']).upper() + '/' + str(random.randint(1000, 9999)) + '.mmdvm_seq', "w") as packet_write_file:
## packet_write_file.write(str(mmdvm_send_seq))
return mmdvm_send_seq
return csbk_seq + mmdvm_send_seq
def format_sms(msg, to_id, from_id, call_type, use_header = True):
@ -1294,21 +1277,14 @@ def format_sms(msg, to_id, from_id, call_type, use_header = True):
header_bits = btf_poc(str(ahex(raw(ip_udp)))[2:-1])
sms_complete_header = create_crc16(gen_header2(to_id, from_id, call_type, header_bits[1], header_bits[0]))
sms_csbk = (csbk_gen(to_id, from_id, header_bits[0]))
sms_csbk = (csbk_gen2(to_id, from_id, header_bits[0]))
logger.debug(sms_complete_header)
# Return corrected fragment with BTF and generate CRC16 with header
return header_bits[2], sms_complete_header #str(ahex(raw(ip_udp)))[2:-1]
return header_bits[2], sms_complete_header, sms_csbk #str(ahex(raw(ip_udp)))[2:-1]
##def gen_header(to_id, from_id, call_type):
## print(call_type)
## if call_type == 1:
## seq_header = '024A' + to_id + from_id + '9550'
## if call_type == 0:
## seq_header = '824A' + to_id + from_id + '9550'
## return seq_header
def gen_header2(to_id, from_id, call_type, poc, btf):
s_poc = hex(poc)[2:]
@ -1349,59 +1325,78 @@ def send_sms(csbk, to_id, from_id, peer_id, call_type, msg, snd_slot = 1):
call_type = 0
# Send all Group data to TS 2, need to fix later.
slot = snd_slot
if csbk == True:
use_csbk = True
# Split message in to multiple if msg characters > 100. 95 is for message count
text_list = []
text_block_count = 0
## sms_count = 1
## tot_blocks = len(msgprint(tot_blocks) / 100
sms_count = 1
tot_blocks = int(math.ceil(len(msg) / 100.0))
while text_block_count < len(msg):
## text_list.append(str(sms_count) + '/' + str(tot_blocks) + ': ' + (msg[text_block_count:text_block_count + 95]))
text_list.append((msg[text_block_count:text_block_count + 95]))
text_block_count = text_block_count + 95
## sms_count = sms_count + 1
if tot_blocks == 1:
text_list.append((msg[text_block_count:text_block_count + 95]))
text_block_count = text_block_count + 95
else:
text_list.append(str(sms_count) + '/' + str(tot_blocks) + ': ' + (msg[text_block_count:text_block_count + 95]))
text_block_count = text_block_count + 95
sms_count = sms_count + 1
print(text_list)
for m in text_list:
print(m)
complete_sms = format_sms(str(m), to_id, from_id, call_type)
csbk_blocks = str(complete_sms[2])
headers = str(complete_sms[1])
sms_data = str(create_crc32(complete_sms[0]))
# Using CSBK for now, will deal with later.
## if csbk == True:
snd_sms = csbk_blocks + headers + sms_data
## elif csbk == False:
## snd_sms = headers + sms_data
if ascii_call_type == 'unit':
# We know where the user is
if bytes.fromhex(to_id) in UNIT_MAP:
## print(CONFIG['SYSTEMS']) #[UNIT_MAP[bytes.fromhex(to_id)][2]]['MODE'])
if CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['BOTH_SLOTS'] == False and CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['ENABLED'] == True:
slot = 0
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, str(format_sms(str(m), to_id, from_id, call_type)[1]) + str(create_crc32(format_sms(str(m), to_id, from_id, call_type)[0])))
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, snd_sms)
for d in snd_seq_lst:
print(ahex(d))
systems[UNIT_MAP[bytes.fromhex(to_id)][0]].send_system(d)
# Sleep to prevent overflowing of Pi-Star buffer
sleep(0.1)
logger.info('Sending on TS: ' + str(slot))
elif CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['BOTH_SLOTS'] == True or CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['MODE'] != 'OPENBRIDGE' and CONFIG['SYSTEMS'][UNIT_MAP[bytes.fromhex(to_id)][0]]['ENABLED'] == True:
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, str(format_sms(str(m), to_id, from_id, call_type)[1]) + str(create_crc32(format_sms(str(m), to_id, from_id, call_type)[0])))
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, snd_sms)
for d in snd_seq_lst:
print(ahex((d)))
systems[UNIT_MAP[bytes.fromhex(to_id)][0]].send_system(d)
# Sleep to prevent overflowing of Pi-Star buffer
sleep(0.1)
logger.info('Sending on TS: ' + str(slot))
# We don't know where the user is
elif bytes.fromhex(to_id) not in UNIT_MAP:
for s in CONFIG['SYSTEMS']:
if CONFIG['SYSTEMS'][s]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][s]['BOTH_SLOTS'] == False and CONFIG['SYSTEMS'][s]['ENABLED'] == True:
slot = 0
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, str(format_sms(str(m), to_id, from_id, call_type)[1]) + str(create_crc32(format_sms(str(m), to_id, from_id, call_type)[0])))
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, snd_sms)
for d in snd_seq_lst:
systems[s].send_system(d)
# Sleep to prevent overflowing of Pi-Star buffer
sleep(0.1)
logger.info('User not in map. Sending on TS: ' + str(slot))
elif CONFIG['SYSTEMS'][s]['MODE'] == 'OPENBRIDGE' and CONFIG['SYSTEMS'][s]['BOTH_SLOTS'] == True and CONFIG['SYSTEMS'][s]['ENABLED'] == True or CONFIG['SYSTEMS'][s]['MODE'] != 'OPENBRIDGE' and CONFIG['SYSTEMS'][s]['ENABLED'] == True:
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, str(format_sms(str(m), to_id, from_id, call_type)[1]) + str(format_sms(str(m), to_id, from_id, call_type)[0]))
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), call_type, snd_sms)
for d in snd_seq_lst:
systems[s].send_system(d)
# Sleep to prevent overflowing of Pi-Star buffer
sleep(0.1)
logger.info('User not in map. Sending on TS: ' + str(slot))
if ascii_call_type == 'group':
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), 0, format_sms(str(m), to_id, from_id, call_type)[1] + create_crc32(format_sms(str(m), to_id, from_id, call_type)[0]))
snd_seq_lst = create_sms_seq(to_id, from_id, peer_id, int(slot), 0, snd_sms)
for s in CONFIG['SYSTEMS']:
for d in snd_seq_lst:
systems[s].send_system(d)
# Sleep to prevent overflowing of Pi-Star buffer
sleep(0.1)
## if ascii_call_type == 'unit':
@ -1501,12 +1496,15 @@ def ars_resp(msg, to_id, from_id, call_type, use_header = True):
sms_complete_header = create_crc16(gen_header2(to_id, from_id, call_type, header_bits[1], header_bits[0]))
sms_csbk = (csbk_gen2(to_id, from_id, header_bits[0]))
logger.debug(sms_complete_header)
## csbk_blocks = str(complete_sms[2])
## logger.debug(sms_complete_header)
# Return corrected fragment with BTF and generate CRC16 with header
return create_crc32(header_bits[2]), sms_complete_header #str(ahex(raw(ip_udp)))[2:-1]
return create_crc32(header_bits[2]), sms_complete_header, sms_csbk #str(ahex(raw(ip_udp)))[2:-1]
@ -1770,9 +1768,11 @@ def data_received(self, _peer_id, _rf_src, _dst_id, _seq, _slot, _call_type, _fr
logger.info('Received ARS "hello"')
logger.debug('Attempt response')
## ars_response = (ars_resp('0002BF01', str(ahex(_rf_src))[2:-1], str(ahex(_dst_id))[2:-1], 1))
ars_response = (ars_resp('0002B700', str(ahex(_rf_src))[2:-1], str(ahex(_dst_id))[2:-1], 1))
ars_snd = create_sms_seq(int_id(_rf_src), int_id(_dst_id), int_id(_dst_id), _slot, 1, ars_response[1] + ars_response[0])
ars_response = (ars_resp('0002BF01', str(ahex(_rf_src))[2:-1], str(ahex(_dst_id))[2:-1], 1))
ars_snd = create_sms_seq(int_id(_rf_src), int_id(_dst_id), int_id(_dst_id), _slot, 1, ars_response[2] + ars_response[1] + ars_response[0])
for i in ars_snd:
# Sleep to prevent overflowing of Pi-Star buffer
sleep(0.1)
systems[UNIT_MAP[_rf_src][0]].send_system(i)
## systems['LOCAL'].send_system(i)