diff --git a/adafruit_gps.py b/adafruit_gps.py index c16f847..2aca02a 100644 --- a/adafruit_gps.py +++ b/adafruit_gps.py @@ -34,6 +34,40 @@ _GPSI2C_DEFAULT_ADDRESS = const(0x10) +_GLL = 0 +_RMC = 1 +_GGA = 2 +_GSA = 3 +_GSA_4_11 = 4 +_GSV7 = 5 +_GSV11 = 6 +_GSV15 = 7 +_GSV19 = 8 +_ST_MIN = _GLL +_ST_MAX = _GSV19 + +_SENTENCE_PARAMS = ( + # 0 - _GLL + "dcdcfcC", + # 1 - _RMC + "fcdcdcffiDCC", + # 2 - _GGA + "fdcdciiffsfsIS", + # 3 - _GSA + "ciIIIIIIIIIIIIfff", + # 4 - _GSA_4_11 + "ciIIIIIIIIIIIIfffS", + # 5 - _GSV7 + "iiiiiiI", + # 6 - _GSV11 + "iiiiiiIiiiI", + # 7 - _GSV15 + "iiiiiiIiiiIiiiI", + # 8 - _GSV19 + "iiiiiiIiiiIiiiIiiiI", +) + + # Internal helper parsing functions. # These handle input that might be none or null and return none instead of # throwing errors. @@ -66,6 +100,96 @@ def _parse_str(nmea_data): return str(nmea_data) +def _read_degrees(data, index, neg): + x = data[index] + if data[index + 1].lower() == neg: + x *= -1.0 + return x + + +def _parse_talker(data_type): + # Split the data_type into talker and sentence_type + if data_type[0] == b"P": # Proprietary codes + return (data_type[:1], data_type[1:]) + + return (data_type[:2], data_type[2:]) + + +def _parse_data(sentence_type, data): + """Parse sentence data for the specified sentence type and + return a list of parameters in the correct format, or return None. + """ + # pylint: disable=too-many-branches + + if not _ST_MIN <= sentence_type <= _ST_MAX: + # The sentence_type is unknown + return None + + param_types = _SENTENCE_PARAMS[sentence_type] + + if len(param_types) != len(data): + # The expected number does not match the number of data items + return None + + params = [] + try: + for i, dti in enumerate(data): + pti = param_types[i] + len_dti = len(dti) + nothing = dti is None or len_dti == 0 + if pti == "c": + # A single character + if len_dti != 1: + return None + params.append(dti) + elif pti == "C": + # A single character or Nothing + if nothing: + params.append(None) + elif len_dti != 1: + return None + else: + params.append(dti) + elif pti == "d": + # A number parseable as degrees + params.append(_parse_degrees(dti)) + elif pti == "D": + # A number parseable as degrees or Nothing + if nothing: + params.append(None) + else: + params.append(_parse_degrees(dti)) + elif pti == "f": + # A floating point number + params.append(_parse_float(dti)) + elif pti == "i": + # An integer + params.append(_parse_int(dti)) + elif pti == "I": + # An integer or Nothing + if nothing: + params.append(None) + else: + params.append(_parse_int(dti)) + elif pti == "s": + # A string + params.append(dti) + elif pti == "S": + # A string or Nothing + if nothing: + params.append(None) + else: + params.append(dti) + else: + raise TypeError(f"GPS: Unexpected parameter type '{pti}'") + except ValueError: + # Something didn't parse, abort + return None + + # Return the parsed data + return params + + # lint warning about too many attributes disabled # pylint: disable-msg=R0902 @@ -81,8 +205,8 @@ def __init__(self, uart, debug=False): self.timestamp_utc = None self.latitude = None self.longitude = None - self.fix_quality = None - self.fix_quality_3d = None + self.fix_quality = 0 + self.fix_quality_3d = 0 self.satellites = None self.satellites_prev = None self.horizontal_dilution = None @@ -103,6 +227,8 @@ def __init__(self, uart, debug=False): self.total_mess_num = None self.mess_num = None self._raw_sentence = None + self._mode_indicator = None + self._magnetic_variation = None self.debug = debug def update(self): @@ -112,6 +238,7 @@ def update(self): """ # Grab a sentence and check its data type to call the appropriate # parsing function. + try: sentence = self._parse_sentence() except UnicodeError: @@ -122,7 +249,7 @@ def update(self): print(sentence) data_type, args = sentence data_type = bytes(data_type.upper(), "ascii") - (talker, sentence_type) = GPS._parse_talker(data_type) + (talker, sentence_type) = _parse_talker(data_type) # Check for all currently known GNSS talkers # GA - Galileo @@ -134,19 +261,23 @@ def update(self): # GN - GNSS / More than one of the above if talker not in (b"GA", b"GB", b"GI", b"GL", b"GP", b"GQ", b"GN"): # It's not a known GNSS source of data + # Assume it's a valid packet anyway return True + result = True + args = args.split(",") if sentence_type == b"GLL": # Geographic position - Latitude/Longitude - self._parse_gpgll(args) + result = self._parse_gll(args) elif sentence_type == b"RMC": # Minimum location info - self._parse_gprmc(args) + result = self._parse_rmc(args) elif sentence_type == b"GGA": # 3D location fix - self._parse_gpgga(args) + result = self._parse_gga(args) elif sentence_type == b"GSV": # Satellites in view - self._parse_gpgsv(talker, args) + result = self._parse_gsv(talker, args) elif sentence_type == b"GSA": # GPS DOP and active satellites - self._parse_gpgsa(talker, args) - return True + result = self._parse_gsa(talker, args) + + return result def send_command(self, command, add_checksum=True): """Send a command string to the GPS. If add_checksum is True (the @@ -256,239 +387,223 @@ def _parse_sentence(self): data_type = sentence[1:delimiter] return (data_type, sentence[delimiter + 1 :]) - @staticmethod - def _parse_talker(data_type): - # Split the data_type into talker and sentence_type - if data_type[0] == b"P": # Proprietary codes - return (data_type[:1], data_type[1:]) - - return (data_type[:2], data_type[2:]) - - def _parse_gpgll(self, args): - data = args.split(",") - if data is None or data[0] is None or (data[0] == ""): - return # Unexpected number of params. - - # Parse latitude and longitude. - self.latitude = _parse_degrees(data[0]) - if self.latitude is not None and data[1] is not None and data[1].lower() == "s": - self.latitude *= -1.0 - self.longitude = _parse_degrees(data[2]) - if ( - self.longitude is not None - and data[3] is not None - and data[3].lower() == "w" - ): - self.longitude *= -1.0 - time_utc = int(_parse_int(float(data[4]))) - if time_utc is not None: - hours = time_utc // 10000 - mins = (time_utc // 100) % 100 - secs = time_utc % 100 - # Set or update time to a friendly python time struct. - if self.timestamp_utc is not None: - self.timestamp_utc = time.struct_time( - (0, 0, 0, hours, mins, secs, 0, 0, -1) - ) - else: - self.timestamp_utc = time.struct_time( - (0, 0, 0, hours, mins, secs, 0, 0, -1) - ) - # Parse data active or void - self.isactivedata = _parse_str(data[5]) - - def _parse_gprmc(self, args): - # Parse the arguments (everything after data type) for NMEA GPRMC - # minimum location fix sentence. - data = args.split(",") - if data is None or len(data) < 11 or data[0] is None or (data[0] == ""): - return # Unexpected number of params. - # Parse fix time. - time_utc = int(_parse_float(data[0])) - if time_utc is not None: - hours = time_utc // 10000 - mins = (time_utc // 100) % 100 - secs = time_utc % 100 - # Set or update time to a friendly python time struct. - if self.timestamp_utc is not None: - self.timestamp_utc = time.struct_time( - ( - self.timestamp_utc.tm_year, - self.timestamp_utc.tm_mon, - self.timestamp_utc.tm_mday, - hours, - mins, - secs, - 0, - 0, - -1, - ) - ) - else: - self.timestamp_utc = time.struct_time( - (0, 0, 0, hours, mins, secs, 0, 0, -1) - ) - # Parse status (active/fixed or void). - status = data[1] - self.fix_quality = 0 - if status is not None and status.lower() == "a": - self.fix_quality = 1 - # Parse latitude and longitude. - self.latitude = _parse_degrees(data[2]) - if self.latitude is not None and data[3] is not None and data[3].lower() == "s": - self.latitude *= -1.0 - self.longitude = _parse_degrees(data[4]) - if ( - self.longitude is not None - and data[5] is not None - and data[5].lower() == "w" - ): - self.longitude *= -1.0 - # Parse out speed and other simple numeric values. - self.speed_knots = _parse_float(data[6]) - self.track_angle_deg = _parse_float(data[7]) - # Parse date. - if data[8] is not None and len(data[8]) == 6: - day = int(data[8][0:2]) - month = int(data[8][2:4]) - year = 2000 + int(data[8][4:6]) # Y2k bug, 2 digit year assumption. - # This is a problem with the NMEA - # spec and not this code. - if self.timestamp_utc is not None: - # Replace the timestamp with an updated one. - # (struct_time is immutable and can't be changed in place) - self.timestamp_utc = time.struct_time( - ( - year, - month, - day, - self.timestamp_utc.tm_hour, - self.timestamp_utc.tm_min, - self.timestamp_utc.tm_sec, - 0, - 0, - -1, - ) - ) - else: - # Time hasn't been set so create it. - self.timestamp_utc = time.struct_time( - (year, month, day, 0, 0, 0, 0, 0, -1) - ) - - def _parse_gpgga(self, args): - # Parse the arguments (everything after data type) for NMEA GPGGA - # 3D location fix sentence. - data = args.split(",") - if data is None or len(data) != 14 or (data[0] == ""): - return # Unexpected number of params. - # Parse fix time. - time_utc = int(_parse_float(data[0])) - if time_utc is not None: - hours = time_utc // 10000 - mins = (time_utc // 100) % 100 - secs = time_utc % 100 - # Set or update time to a friendly python time struct. - if self.timestamp_utc is not None: - self.timestamp_utc = time.struct_time( - ( - self.timestamp_utc.tm_year, - self.timestamp_utc.tm_mon, - self.timestamp_utc.tm_mday, - hours, - mins, - secs, - 0, - 0, - -1, - ) - ) + def _update_timestamp_utc(self, time_utc, date=None): + hours = time_utc // 10000 + mins = (time_utc // 100) % 100 + secs = time_utc % 100 + if date is None: + if self.timestamp_utc is None: + day, month, year = 0, 0, 0 else: - self.timestamp_utc = time.struct_time( - (0, 0, 0, hours, mins, secs, 0, 0, -1) - ) - # Parse latitude and longitude. - self.latitude = _parse_degrees(data[1]) - if self.latitude is not None and data[2] is not None and data[2].lower() == "s": - self.latitude *= -1.0 - self.longitude = _parse_degrees(data[3]) - if ( - self.longitude is not None - and data[4] is not None - and data[4].lower() == "w" - ): - self.longitude *= -1.0 - # Parse out fix quality and other simple numeric values. - self.fix_quality = _parse_int(data[5]) - self.satellites = _parse_int(data[6]) - self.horizontal_dilution = _parse_float(data[7]) + day = self.timestamp_utc.tm_mday + month = self.timestamp_utc.tm_mon + year = self.timestamp_utc.tm_year + else: + day = date // 10000 + month = (date // 100) % 100 + year = 2000 + date % 100 + + self.timestamp_utc = time.struct_time( + (year, month, day, hours, mins, secs, 0, 0, -1) + ) + + def _parse_gll(self, data): + # GLL - Geographic Position - Latitude/Longitude + + if data is None or len(data) != 7: + return False # Unexpected number of params. + data = _parse_data(_GLL, data) + if data is None: + return False # Params didn't parse + + # Latitude + self.latitude = _read_degrees(data, 0, "s") + + # Longitude + self.longitude = _read_degrees(data, 2, "w") + + # UTC time of position + self._update_timestamp_utc(int(data[4])) + + # Status Valid(A) or Invalid(V) + self.isactivedata = data[5] + + # Parse FAA mode indicator + self._mode_indicator = data[6] + + return True + + def _parse_rmc(self, data): + # RMC - Recommended Minimum Navigation Information + + if data is None or len(data) != 12: + return False # Unexpected number of params. + data = _parse_data(_RMC, data) + if data is None: + return False # Params didn't parse + + # UTC time of position and date + self._update_timestamp_utc(int(data[0]), data[8]) + + # Status Valid(A) or Invalid(V) + self.isactivedata = data[1] + if data[1].lower() == "a": + if self.fix_quality == 0: + self.fix_quality = 1 + else: + self.fix_quality = 0 + + # Latitude + self.latitude = _read_degrees(data, 2, "s") + + # Longitude + self.longitude = _read_degrees(data, 4, "w") + + # Speed over ground, knots + self.speed_knots = data[6] + + # Track made good, degrees true + self.track_angle_deg = data[7] + + # Magnetic variation + if data[9] is None or data[10] is None: + self._magnetic_variation = None + else: + self._magnetic_variation = _read_degrees(data, 9, "w") + + # Parse FAA mode indicator + self._mode_indicator = data[11] + + return True + + def _parse_gga(self, data): + # GGA - Global Positioning System Fix Data + + if data is None or len(data) != 14: + return False # Unexpected number of params. + data = _parse_data(_GGA, data) + if data is None: + return False # Params didn't parse + + # UTC time of position + self._update_timestamp_utc(int(data[0])) + + # Latitude + self.latitude = _read_degrees(data, 1, "s") + + # Longitude + self.longitude = _read_degrees(data, 3, "w") + + # GPS quality indicator + # 0 - fix not available, + # 1 - GPS fix, + # 2 - Differential GPS fix (values above 2 are 2.3 features) + # 3 - PPS fix + # 4 - Real Time Kinematic + # 5 - Float RTK + # 6 - estimated (dead reckoning) + # 7 - Manual input mode + # 8 - Simulation mode + self.fix_quality = data[5] + + # Number of satellites in use, 0 - 12 + self.satellites = data[6] + + # Horizontal dilution of precision + self.horizontal_dilution = data[7] + + # Antenna altitude relative to mean sea level self.altitude_m = _parse_float(data[8]) + # data[9] - antenna altitude unit, always 'M' ??? + + # Geoidal separation relative to WGS 84 self.height_geoid = _parse_float(data[10]) + # data[11] - geoidal separation unit, always 'M' ??? + + # data[12] - Age of differential GPS data, can be null + # data[13] - Differential reference station ID, can be null + + return True + + def _parse_gsa(self, talker, data): + # GSA - GPS DOP and active satellites + + if data is None or len(data) not in (17, 18): + return False # Unexpected number of params. + if len(data) == 17: + data = _parse_data(_GSA, data) + else: + data = _parse_data(_GSA_4_11, data) + if data is None: + return False # Params didn't parse - def _parse_gpgsa(self, talker, args): talker = talker.decode("ascii") - data = args.split(",") - if data is None or (data[0] == ""): - return # Unexpected number of params - - # Parse selection mode - self.sel_mode = _parse_str(data[0]) - # Parse 3d fix - self.fix_quality_3d = _parse_int(data[1]) + + # Selection mode: 'M' - manual, 'A' - automatic + self.sel_mode = data[0] + + # Mode: 1 - no fix, 2 - 2D fix, 3 - 3D fix + self.fix_quality_3d = data[1] + satlist = list(filter(None, data[2:-4])) self.sat_prns = [] for sat in satlist: - self.sat_prns.append("{}{}".format(talker, _parse_int(sat))) + self.sat_prns.append("{}{}".format(talker, sat)) - # Parse PDOP, dilution of precision - self.pdop = _parse_float(data[-3]) - # Parse HDOP, horizontal dilution of precision - self.hdop = _parse_float(data[-2]) - # Parse VDOP, vertical dilution of precision - self.vdop = _parse_float(data[-1]) + # PDOP, dilution of precision + self.pdop = _parse_float(data[14]) - def _parse_gpgsv(self, talker, args): - # Parse the arguments (everything after data type) for NMEA GPGGA + # HDOP, horizontal dilution of precision + self.hdop = _parse_float(data[15]) + + # VDOP, vertical dilution of precision + self.vdop = _parse_float(data[16]) + + # data[17] - System ID + + return True + + def _parse_gsv(self, talker, data): + # GSV - Satellites in view # pylint: disable=too-many-branches - # 3D location fix sentence. - talker = talker.decode("ascii") - data = args.split(",") - if data is None or (data[0] == ""): - return # Unexpected number of params. - # Parse number of messages - self.total_mess_num = _parse_int(data[0]) # Total number of messages - # Parse message number - self.mess_num = _parse_int(data[1]) # Message number - # Parse number of satellites in view - self.satellites = _parse_int(data[2]) # Number of satellites + if data is None or len(data) not in (7, 11, 15, 19): + return False # Unexpected number of params. + data = _parse_data( + {7: _GSV7, 11: _GSV11, 15: _GSV15, 19: _GSV19}[len(data)], + data, + ) + if data is None: + return False # Params didn't parse + + talker = talker.decode("ascii") - if len(data) < 3: - return + # Number of messages + self.total_mess_num = data[0] + # Message number + self.mess_num = data[1] + # Number of satellites in view + self.satellites = data[2] sat_tup = data[3:] satlist = [] timestamp = time.monotonic() for i in range(len(sat_tup) // 4): - try: - j = i * 4 - value = ( - # Satellite number - "{}{}".format(talker, _parse_int(sat_tup[0 + j])), - # Elevation in degrees - _parse_int(sat_tup[1 + j]), - # Azimuth in degrees - _parse_int(sat_tup[2 + j]), - # signal-to-noise ratio in dB - _parse_int(sat_tup[3 + j]), - # Timestamp - timestamp, - ) - satlist.append(value) - except ValueError: - # Something wasn't an int - pass + j = i * 4 + value = ( + # Satellite number + "{}{}".format(talker, sat_tup[0 + j]), + # Elevation in degrees + sat_tup[1 + j], + # Azimuth in degrees + sat_tup[2 + j], + # signal-to-noise ratio in dB + sat_tup[3 + j], + # Timestamp + timestamp, + ) + satlist.append(value) if self._sats is None: self._sats = [] @@ -518,6 +633,8 @@ def _parse_gpgsv(self, talker, args): self.satellites_prev = self.satellites + return True + class GPS_GtopI2C(GPS): """GTop-compatible I2C GPS parsing module. Can parse simple NMEA data @@ -546,7 +663,7 @@ def read(self, num_bytes=1): # 'stuffed' newlines and then append to our result array for byteification i2c.readinto(self._charbuff) char = self._charbuff[0] - if (char == ord("\n")) and (self._lastbyte != ord("\r")): + if (char == 0x0A) and (self._lastbyte != 0x0D): continue # skip duplicate \n's! result.append(char) self._lastbyte = char # keep track of the last character approved @@ -570,14 +687,14 @@ def readline(self): timeout = time.monotonic() + self._timeout while timeout > time.monotonic(): # check if our internal buffer has a '\n' termination already - if self._internalbuffer and (self._internalbuffer[-1] == ord("\n")): + if self._internalbuffer and (self._internalbuffer[-1] == 0x0A): break char = self.read(1) if not char: continue self._internalbuffer.append(char[0]) # print(bytearray(self._internalbuffer)) - if self._internalbuffer and self._internalbuffer[-1] == ord("\n"): + if self._internalbuffer and self._internalbuffer[-1] == 0x0A: ret = bytearray(self._internalbuffer) self._internalbuffer = [] # reset the buffer to empty return ret diff --git a/examples/gps_time_source.py b/examples/gps_time_source.py index ef25214..0572b1a 100644 --- a/examples/gps_time_source.py +++ b/examples/gps_time_source.py @@ -24,6 +24,18 @@ rtc.set_time_source(gps) the_rtc = rtc.RTC() + +def _format_datetime(datetime): + return "{:02}/{:02}/{} {:02}:{:02}:{:02}".format( + datetime.tm_mon, + datetime.tm_mday, + datetime.tm_year, + datetime.tm_hour, + datetime.tm_min, + datetime.tm_sec, + ) + + last_print = time.monotonic() while True: @@ -36,39 +48,12 @@ print("No time data from GPS yet") continue # Time & date from GPS informations - print( - "Fix timestamp: {:02}/{:02}/{} {:02}:{:02}:{:02}".format( - gps.timestamp_utc.tm_mon, # Grab parts of the time from the - gps.timestamp_utc.tm_mday, # struct_time object that holds - gps.timestamp_utc.tm_year, # the fix time. Note you might - gps.timestamp_utc.tm_hour, # not get all data like year, day, - gps.timestamp_utc.tm_min, # month! - gps.timestamp_utc.tm_sec, - ) - ) + print("Fix timestamp: {}".format(_format_datetime(gps.timestamp_utc))) # Time & date from internal RTC - print( - "RTC timestamp: {:02}/{:02}/{} {:02}:{:02}:{:02}".format( - the_rtc.datetime.tm_mon, - the_rtc.datetime.tm_mday, - the_rtc.datetime.tm_year, - the_rtc.datetime.tm_hour, - the_rtc.datetime.tm_min, - the_rtc.datetime.tm_sec, - ) - ) + print("RTC timestamp: {}".format(_format_datetime(the_rtc.datetime))) # Time & date from time.localtime() function local_time = time.localtime() - print( - "Local time: {:02}/{:02}/{} {:02}:{:02}:{:02}".format( - local_time.tm_mon, - local_time.tm_mday, - local_time.tm_year, - local_time.tm_hour, - local_time.tm_min, - local_time.tm_sec, - ) - ) + print("Local time: {}".format(_format_datetime(local_time)))