-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsaamfram_core_utils.py
350 lines (272 loc) · 12.6 KB
/
saamfram_core_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
import logging
import sys
import asyncio
import threading
import traceback
import random
import json
import constant as cn
import debug as db
import ipaddress
from crc import Calculator, Configuration
from datetime import datetime, timedelta
from getmac import get_mac_address
from collections import OrderedDict
import time
import uuid
class SaamframCoreUtils(object):
debug = db.Debug(cn.DEBUG_INFO)
base32_chars = "0123456789ABCDEFGHIJKLMNOPQRSTUV"
delimiter_char = cn.DELIMETER_CHAR
def getDecodeEscapes(self, message):
self.debug.info_message("getDecodeEscapes")
string_out = ''
char_count = 0
message_len = len(message)
while char_count < message_len:
if(char_count+1 < message_len):
if(message[char_count] == '/'):
""" test to see if this is an escape sequence"""
if(message[char_count+1] == 'D'):
string_out = string_out + '}'
char_count = char_count + 2
elif(message[char_count+1] == 'C'):
string_out = string_out + '{'
char_count = char_count + 2
elif(message[char_count+1] == 'B'):
string_out = string_out + ']'
char_count = char_count + 2
elif(message[char_count+1] == 'A'):
string_out = string_out + '['
char_count = char_count + 2
elif(message[char_count+1] == 'N'):
if (platform.system() == 'Windows'):
string_out = string_out + '\r\n'
else:
string_out = string_out + '\n'
char_count = char_count + 2
elif(message[char_count+1] == '/'):
string_out = string_out + '/'
char_count = char_count + 2
else:
""" make sure this is an RLE escape sequence """
if(message[char_count+1].isdigit()):
if(message[char_count+2].isdigit()):
if(message[char_count+3].isdigit()):
""" four digit RLE codes and up not supported"""
if(message[char_count+4].isdigit()):
self.debug.info_message("do nothing")
elif(message[char_count+4] == cn.DELIMETER_CHAR):
""" process triple digit RLE code"""
string_out = string_out + (cn.DELIMETER_CHAR * ((int(message[char_count+1])*100) + (int(message[char_count+2])*10)+ (int(message[char_count+3]))) )
char_count = char_count + 5
elif(message[char_count+3] == cn.DELIMETER_CHAR):
""" process double digit RLE code"""
string_out = string_out + (cn.DELIMETER_CHAR * ((int(message[char_count+1])*10) + (int(message[char_count+2]))) )
char_count = char_count + 4
elif(message[char_count+2] == cn.DELIMETER_CHAR):
""" process single digit RLE code"""
string_out = string_out + (cn.DELIMETER_CHAR * int(message[char_count+1]) )
char_count = char_count + 3
else:
string_out = string_out + message[char_count]
char_count = char_count + 1
else:
string_out = string_out + message[char_count]
char_count = char_count + 1
else:
string_out = string_out + message[char_count]
char_count = char_count + 1
message = string_out
self.debug.info_message("completed getDecodeEscapes. unescaped message: " + str(message) )
return message
""" This method decodes the int time from the ID string"""
def getDecodeIntTimeFromUniqueId(self, ID):
""" use the following to reverse the callsign from the ID string to show who created the email"""
timestamp_string = ID.split('_',1)[1]
inttime = ((int(timestamp_string,36))/100.0)
self.debug.info_message("datetime = " + str(datetime.utcfromtimestamp(inttime) ) )
return inttime
""" This method decodes the timestamp from the ID string"""
def getDecodeTimestampFromUniqueId(self, ID):
""" use the following to reverse the callsign from the ID string to show who created the email"""
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ/"
charsLen = len(chars)
timestamp_string = ID.split('_',1)[1]
inttime = ((int(timestamp_string,36))/100.0)
self.debug.info_message("datetime = " + str(datetime.utcfromtimestamp(inttime) ) )
timestamp = str(datetime.utcfromtimestamp(inttime) )
self.debug.info_message("reverse encoded timestamp is: " + timestamp )
return timestamp
""" This method decodes the callsign from the ID string"""
def getDecodeCallsignFromUniqueId(self, ID):
""" use the following to reverse the callsign from the ID string to show who created the email"""
chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ/"
charsLen = len(chars)
hexnum = '0x' + ID.split('_',1)[0]
timestamp_string = ID.split('_',1)[1]
inttime = ((int(timestamp_string,36))/100.0)
self.debug.info_message("datetime = " + str(datetime.utcfromtimestamp(inttime) ) )
intnum = int(hexnum,16)
callsign = ""
while intnum:
callsign = chars[intnum % charsLen] + callsign
intnum //= charsLen
return callsign
def getEOMChecksum(self, mystr):
return self.calcEOMCRC(mystr)
""" always use a 20 bit / 4 digit CRC for end of message checksum"""
def calcEOMCRC(self, string):
return self.calcFourDigitCRC(string)
"""
CRC calculation uses 5 bit nibbles in base 32 so four digits is 20 bits
0xc1acf polynomial protects up to 524267 bit data word (65533 x 8 bit characters) length at HD=4
this is used for end of message checksum for messages > 2046 characters
"""
def calcFourDigitCRC(self, string):
return self.calcCRC(20, 0xc1acf, string)
def calcCRC(self, width, poly, string):
self.debug.info_message('calcCRC')
data = bytes(string,"ascii")
init_value=0x00
final_xor_value=0x00
reverse_input=False
reverse_output=False
configuration = Configuration(width, poly, init_value, final_xor_value, reverse_input, reverse_output)
use_table = True
crc_calculator = Calculator(configuration, use_table)
checksum = crc_calculator.checksum(data)
self.debug.info_message(str(checksum))
if(width == 10):
high, low = checksum >> 5, checksum & 0x1F
self.debug.info_message('10 bit checksum: ' + str(self.base32_chars[high] + self.base32_chars[low]))
return self.base32_chars[high] + self.base32_chars[low]
elif(width == 15):
high, mid, low = checksum >> 10, (checksum >> 5) & 0x1F, checksum & 0x1F
self.debug.info_message('15 bit checksum: ' + str(self.base32_chars[high] + self.base32_chars[mid] + self.base32_chars[low]))
return self.base32_chars[high] + self.base32_chars[mid] + self.base32_chars[low]
elif(width == 20):
high, mid_high, mid_low, low = checksum >> 15, (checksum >> 10) & 0x1F, (checksum >> 5) & 0x1F, checksum & 0x1F
self.debug.info_message('20 bit checksum: ' + str(self.base32_chars[high] + self.base32_chars[mid_high] + self.base32_chars[mid_low] + self.base32_chars[low]))
return self.base32_chars[high] + self.base32_chars[mid_high] + self.base32_chars[mid_low] + self.base32_chars[low]
return ''
def deconstructFragmentedMessage(self, remainder):
content = []
split_string = remainder.split('{' + cn.FORMAT_CONTENT + self.delimiter_char, 1)
data_and_remainder = split_string[1].split('}', 1)
rleDecodedString = self.getDecodeEscapes(data_and_remainder[0])
""" only process the content for escapes"""
data = rleDecodedString.split(self.delimiter_char)
for x in range(len(data)):
self.debug.info_message( cn.FORMAT_CONTENT + ": data[x] is: " + data[x] )
remainder = data_and_remainder[1]
ID = data[0]
msgto = data[1]
priority = data[2]
fragsize = data[3]
subject = data[4]
formname = data[5]
version = data[6]
timestamp = self.getDecodeIntTimeFromUniqueId(ID)
msgfrom = self.getDecodeCallsignFromUniqueId(ID)
return ID, msgto, priority, timestamp, msgfrom
def deconstructFragTagMsg(self, fragtagmsg):
reconstruct = ''
completed = False
remainder = fragtagmsg
while completed == False:
split_string = remainder.split('[F', 1)
numbers_and_remainder = split_string[1].split(']', 1)
numbers = numbers_and_remainder[0].split(',', 1)
part_number = numbers[0]
parts_total = numbers[1]
remainder = numbers_and_remainder[1]
self.debug.info_message("part number and total: " + part_number + "," + parts_total )
if(part_number == parts_total):
completed = True
""" deconstruct message text """
split_string = remainder.split('[', 1)
checksum_and_remainder = split_string[1].split(']', 1)
message_text = split_string[0]
reconstruct = reconstruct + message_text
message_checksum = checksum_and_remainder[0]
remainder = checksum_and_remainder[1]
self.debug.info_message("message text: " + message_text )
self.debug.info_message("message checksum: " + message_checksum )
if(remainder == '' ):
completed = True
self.debug.info_message("reconstructed string: " + reconstruct )
""" pull the 4 digit checksum off the end of the completed string. """
EOM_checksum = reconstruct[-4:]
reconstruct = reconstruct[:-4]
try:
self.debug.info_message("reconstructed string 2: " + reconstruct )
self.debug.info_message("EOM checksum 1 is: " + str(EOM_checksum.upper() ) )
self.debug.info_message("EOM checksum 2 is: " + str(self.getEOMChecksum(reconstruct).upper()) )
except:
self.debug.error_message("Exception in deconstructFragTagMsgFldigi: " + str(sys.exc_info()[0]) + str(sys.exc_info()[1] ))
if(EOM_checksum.upper() == self.getEOMChecksum(reconstruct).upper() ):
return True, reconstruct
else:
return False, reconstruct
return True, reconstruct
def getMacAddress(self):
return get_mac_address()
def macToUuid(self, mac_address):
self.debug.info_message("macToUuid")
try:
mac_address = mac_address.replace(':', '')
mac_address = mac_address.replace('-', '').upper()
mac_int = int(mac_address, 16)
self.debug.info_message("macInt: " + str(mac_int))
ret_value = uuid.UUID(int=mac_int)
self.debug.info_message("return value: " + str(ret_value))
return ret_value
except:
self.debug.error_message("Exception in macToUuid: " + str(sys.exc_info()[0]) + str(sys.exc_info()[1] ))
return None
def macToInt(self, mac_address):
self.debug.info_message("macToUuid")
try:
mac_address = mac_address.replace(':', '')
mac_address = mac_address.replace('-', '').upper()
mac_int = int(mac_address, 16)
self.debug.info_message("macInt: " + str(mac_int))
return mac_int
except:
self.debug.error_message("Exception in macToUuid: " + str(sys.exc_info()[0]) + str(sys.exc_info()[1] ))
return None
def uuidToMac(self, uuid):
return ':'.join(['{:02x}'.format((uuid >> elements) & 0xff) for elements in range(5,-1,-1)])
def getEncodeUniqueMacId(self, mac_address):
self.debug.info_message("getEncodeUniqueMacId\n")
try:
""" new encode for timestamp """
base_36 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
encoded = ''
temp_var = int(round(datetime.utcnow().timestamp()*100))
self.debug.info_message("datetime = " + str(datetime.utcfromtimestamp((temp_var)/100.0)))
self.debug.info_message("temp var is " + str(temp_var))
while (temp_var != 0):
temp_var, i = divmod(temp_var, 36)
encoded = base_36[i] + encoded
self.debug.info_message("encoded = " + encoded)
self.debug.info_message("original number = " + str(int(encoded,36)))
self.debug.info_message("datetime = " + str(datetime.utcfromtimestamp((int(encoded,36))/100.0)))
""" prepare mac encoding """
encoded_mac = ''
temp_var = mac_address
while (temp_var != 0):
temp_var, i = divmod(temp_var, 36)
encoded_mac = base_36[i] + encoded_mac
""" no need to be decodeable"""
ID = encoded_mac + encoded
except:
self.debug.error_message("Exception in getEncodeUniqueMacId: " + str(sys.exc_info()[0]) + str(sys.exc_info()[1] ))
return (ID)
def isIpRoutable(self, ip):
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast:
return False
else:
return True