# HG changeset patch # User Robert C Jennings rcj4747@gmail.com # Date 1535818178 18000 # Sat Sep 01 11:09:38 2018 -0500 # Node ID 0a65ceee60f5ff019c954536d2fb1162428945e0 # Parent 4873d5437a583c3a1b169808c3d29a53524bc5b2 [boblov_x3plus] Add Boblov X3Plus radio Fixes #6073
Add support for this motorcycle/bicycle helmet radio. It operates in the EU PMR446 and FRS/GMRS frequencies for certain, but also appears to support 70cm band work (untested).
diff --git a/chirp/drivers/boblov_x3plus.py b/chirp/drivers/boblov_x3plus.py new file mode 100644 --- /dev/null +++ b/chirp/drivers/boblov_x3plus.py @@ -0,0 +1,572 @@ +""" +Radio driver for the Boblov X3 Plus Motorcycle Helmet Radio +""" +# Copyright 2018 Robert C Jennings rcj4747@gmail.com +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see http://www.gnu.org/licenses/. + +import logging +import struct +import time + +from datetime import datetime +from textwrap import dedent + +from chirp import ( + bitwise, + chirp_common, + directory, + errors, + memmap, + util, +) +from chirp.settings import ( + RadioSetting, + RadioSettingGroup, + RadioSettings, + RadioSettingValueBoolean, + RadioSettingValueInteger, + RadioSettingValueList, +) + +LOG = logging.getLogger(__name__) + + +@directory.register +class BoblovX3Plus(chirp_common.CloneModeRadio, + chirp_common.ExperimentalRadio): + """Boblov X3 Plus motorcycle/cycling helmet radio""" + + VENDOR = 'Boblov' + MODEL = 'X3Plus' + BAUD_RATE = 9600 + CHANNELS = 16 + + MEM_FORMAT = """ + #seekto 0x0010; + struct { + lbcd rxfreq[4]; + lbcd txfreq[4]; + lbcd rxtone[2]; + lbcd txtone[2]; + u8 unknown1:1, + compander:1, + scramble:1, + skip:1, + highpower:1, + narrow:1, + unknown2:1, + bcl:1; + u8 unknown3[3]; + } memory[16]; + #seekto 0x03C0; + struct { + u8 unknown1:4, + voiceprompt:2, + batterysaver:1, + beep:1; + u8 squelchlevel; + u8 unknown2; + u8 timeouttimer; + u8 voxlevel; + u8 unknown3; + u8 unknown4; + u8 voxdelay; + } settings; + """ + + # Radio command data + CMD_ACK = '\x06' + CMD_IDENTIFY = '\x02' + CMD_PROGRAM_ENTER = '.VKOGRAM' + CMD_PROGRAM_EXIT = '\x62' # 'b' + CMD_READ = 'R' + CMD_WRITE = 'W' + + BLOCK_SIZE = 0x08 + + VOICE_LIST = ['Off', 'Chinese', 'English'] + TIMEOUTTIMER_LIST = ['Off', '30 seconds', '60 seconds', '90 seconds', + '120 seconds', '150 seconds', '180 seconds', + '210 seconds', '240 seconds', '270 seconds', + '300 seconds'] + VOXLEVEL_LIST = ['Off', '1', '2', '3', '4', '5', '6', '7', '8', '9'] + VOXDELAY_LIST = ['1 seconds', '2 seconds', + '3 seconds', '4 seconds', '5 seconds'] + X3P_POWER_LEVELS = [chirp_common.PowerLevel('Low', watts=0.5), + chirp_common.PowerLevel('High', watts=2.00)] + + _memsize = 0x03F0 + _ranges = [ + (0x0000, 0x03F0), + ] + + @classmethod + def get_prompts(cls): + rp = chirp_common.RadioPrompts() + rp.experimental = _(dedent("""\ + The X3Plus driver is currently experimental. + + There are no known issues but you should proceed with caution. + + Please save an unedited copy of your first successful + download to a CHIRP Radio Images (*.img) file. + """)) + return rp + + @classmethod + def match_model(cls, filedata, filename): + """Given contents of a stored file (@filedata), return True if + this radio driver handles the represented model""" + + if len(filedata) != cls._memsize: + LOG.debug('Boblov_x3plus: match_model: size mismatch') + return False + + LOG.debug('Boblov_x3plus: match_model: size matches') + + if 'P310' in filedata[0x03D0:0x03D8]: + LOG.debug('Boblov_x3plus: match_model: radio ID matches') + return True + + LOG.debug('Boblov_x3plus: match_model: no radio ID match') + return False + + def get_features(self): + """Return a RadioFeatures object for this radio""" + + rf = chirp_common.RadioFeatures() + rf.has_settings = True + rf.valid_modes = ['NFM', 'FM'] # 12.5 KHz, 25 kHz. + rf.valid_power_levels = self.X3P_POWER_LEVELS + rf.valid_skips = ['', 'S'] + rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross'] + rf.valid_duplexes = ['', '-', '+', 'split', 'off'] + rf.can_odd_split = True + rf.has_rx_dtcs = True + rf.has_ctone = True + rf.has_cross = True + rf.valid_cross_modes = [ + 'Tone->Tone', + 'DTCS->', + '->DTCS', + 'Tone->DTCS', + 'DTCS->Tone', + '->Tone', + 'DTCS->DTCS'] + rf.has_tuning_step = False + rf.has_bank = False + rf.has_name = False + rf.memory_bounds = (1, self.CHANNELS) + rf.valid_bands = [(400000000, 470000000)] + return rf + + def process_mmap(self): + """Process a newly-loaded or downloaded memory map""" + self._memobj = bitwise.parse(self.MEM_FORMAT, self._mmap) + + def sync_in(self): + "Initiate a radio-to-PC clone operation" + + LOG.debug('Cloning from radio') + status = chirp_common.Status() + status.msg = 'Cloning from radio' + status.cur = 0 + status.max = self._memsize + self.status_fn(status) + + self._enter_programming_mode() + + data = '' + for addr in range(0, self._memsize, self.BLOCK_SIZE): + status.cur = addr + self.BLOCK_SIZE + self.status_fn(status) + + block = self._read_block(addr, self.BLOCK_SIZE) + data += block + + LOG.debug('Address: %04x', addr) + LOG.debug(util.hexprint(block)) + + self._exit_programming_mode() + + self._mmap = memmap.MemoryMap(data) + self.process_mmap() + + def sync_out(self): + "Initiate a PC-to-radio clone operation" + + LOG.debug('Upload to radio') + status = chirp_common.Status() + status.msg = 'Uploading to radio' + status.cur = 0 + status.max = self._memsize + self.status_fn(status) + + self._enter_programming_mode() + + for start_addr, end_addr in self._ranges: + for addr in range(start_addr, end_addr, self.BLOCK_SIZE): + status.cur = addr + self.BLOCK_SIZE + self.status_fn(status) + + self._write_block(addr, self.BLOCK_SIZE) + + self._exit_programming_mode() + + def get_raw_memory(self, number): + """Return a raw string describing the memory at @number""" + return repr(self._memobj.memory[number - 1]) + + @staticmethod + def _decode_tone(val): + val = int(val) + if val == 16665: + return '', None, None + elif val >= 12000: + return 'DTCS', val - 12000, 'R' + elif val >= 8000: + return 'DTCS', val - 8000, 'N' + + return 'Tone', val / 10.0, None + + @staticmethod + def _encode_tone(memval, mode, value, pol): + if mode == '': + memval[0].set_raw(0xFF) + memval[1].set_raw(0xFF) + elif mode == 'Tone': + memval.set_value(int(value * 10)) + elif mode == 'DTCS': + flag = 0x80 if pol == 'N' else 0xC0 + memval.set_value(value) + memval[1].set_bits(flag) + else: + raise Exception('Internal error: invalid mode `%s`' % mode) + + def get_memory(self, number): + """Return a Memory object for the memory at location @number""" + try: + rmem = self._memobj.memory[number - 1] + except KeyError: + raise errors.InvalidMemoryLocation('Unknown channel %s' % number) + + if number < 1 or number > self.CHANNELS: + raise errors.InvalidMemoryLocation( + 'Channel number must be 1 and %s' % self.CHANNELS) + + mem = chirp_common.Memory() + mem.number = number + mem.freq = int(rmem.rxfreq) * 10 + + # A blank (0MHz) or 0xFFFFFFFF frequency is considered empty + if mem.freq == 0 and rmem.rxfreq.get_raw() == '\xFF\xFF\xFF\xFF': + LOG.debug('empty channel %d', number) + mem.freq = 0 + mem.empty = True + return mem + + if rmem.txfreq.get_raw() == '\xFF\xFF\xFF\xFF': + mem.duplex = 'off' + mem.offset = 0 + elif int(rmem.rxfreq) == int(rmem.txfreq): + mem.duplex = '' + mem.offset = 0 + else: + mem.duplex = '-' if int(rmem.rxfreq) > int(rmem.txfreq) else '+' + mem.offset = abs(int(rmem.rxfreq) - int(rmem.txfreq)) * 10 + + mem.mode = 'NFM' if rmem.narrow else 'FM' + mem.skip = 'S' if rmem.skip else '' + mem.power = self.X3P_POWER_LEVELS[rmem.highpower] + + txtone = self._decode_tone(rmem.txtone) + rxtone = self._decode_tone(rmem.rxtone) + chirp_common.split_tone_decode(mem, txtone, rxtone) + + mem.extra = RadioSettingGroup('Extra', 'extra') + mem.extra.append(RadioSetting('bcl', 'Busy Channel Lockout', + RadioSettingValueBoolean( + current=(not rmem.bcl)))) + mem.extra.append(RadioSetting('scramble', 'Scramble', + RadioSettingValueBoolean( + current=(not rmem.scramble)))) + mem.extra.append(RadioSetting('compander', 'Compander', + RadioSettingValueBoolean( + current=(not rmem.compander)))) + + return mem + + def set_memory(self, memory): + """Set the memory object @memory""" + rmem = self._memobj.memory[memory.number - 1] + + if memory.empty: + rmem.set_raw('\xFF' * (rmem.size() / 8)) + return + + rmem.rxfreq = memory.freq / 10 + + set_txtone = True + if memory.duplex == 'off': + for i in range(0, 4): + rmem.txfreq[i].set_raw('\xFF') + # If recieve only then txtone value should be none + self._encode_tone(rmem.txtone, mode='', value=None, pol=None) + set_txtone = False + elif memory.duplex == 'split': + rmem.txfreq = memory.offset / 10 + elif memory.duplex == '+': + rmem.txfreq = (memory.freq + memory.offset) / 10 + elif memory.duplex == '-': + rmem.txfreq = (memory.freq - memory.offset) / 10 + else: + rmem.txfreq = memory.freq / 10 + + txtone, rxtone = chirp_common.split_tone_encode(memory) + if set_txtone: + self._encode_tone(rmem.txtone, *txtone) + self._encode_tone(rmem.rxtone, *rxtone) + + rmem.narrow = 'N' in memory.mode + rmem.skip = memory.skip == 'S' + + for setting in memory.extra: + # NOTE: Only three settings right now, all are inverted + setattr(rmem, setting.get_name(), not int(setting.value)) + + def get_settings(self): + """ + Return a RadioSettings list containing one or more RadioSettingGroup + or RadioSetting objects. These represent general settings that can + be adjusted on the radio. + """ + cur = self._memobj.settings + basic = RadioSettingGroup('basic', 'Basic Settings') + rs = RadioSetting('squelchlevel', 'Squelch level', + RadioSettingValueInteger( + minval=0, maxval=9, + current=cur.squelchlevel)) + basic.append(rs) + rs = RadioSetting('timeouttimer', 'Timeout timer', + RadioSettingValueList( + options=self.TIMEOUTTIMER_LIST, + current=self.TIMEOUTTIMER_LIST[cur.timeouttimer])) + basic.append(rs) + rs = RadioSetting('voiceprompt', 'Voice prompt', + RadioSettingValueList( + options=self.VOICE_LIST, + current=self.VOICE_LIST[cur.voiceprompt])) + basic.append(rs) + rs = RadioSetting('voxlevel', 'Vox level', + RadioSettingValueList( + options=self.VOXLEVEL_LIST, + current=self.VOXLEVEL_LIST[cur.voxlevel])) + basic.append(rs) + rs = RadioSetting('voxdelay', 'VOX delay', + RadioSettingValueList( + options=self.VOXDELAY_LIST, + current=self.VOXDELAY_LIST[cur.voxdelay])) + basic.append(rs) + basic.append(RadioSetting('batterysaver', 'Battery saver', + RadioSettingValueBoolean( + current=cur.batterysaver))) + basic.append(RadioSetting('beep', 'Beep', + RadioSettingValueBoolean( + current=cur.beep))) + return RadioSettings(basic) + + def set_settings(self, settings): + """ + Accepts the top-level RadioSettingGroup returned from + get_settings() and adjusts the values in the radio accordingly. + This function expects the entire RadioSettingGroup hierarchy + returned from get_settings(). + """ + for element in settings: + if not isinstance(element, RadioSetting): + self.set_settings(element) + continue + else: + try: + if '.' in element.get_name(): + bits = element.get_name().split('.') + obj = self._memobj + for bit in bits[:-1]: + obj = getattr(obj, bit) + setting = bits[-1] + else: + obj = self._memobj.settings + setting = element.get_name() + + if element.has_apply_callback(): + LOG.debug('Using apply callback') + element.run_apply_callback() + else: + LOG.debug('Setting %s = %s', setting, element.value) + setattr(obj, setting, element.value) + except Exception: + LOG.debug(element.get_name()) + raise + + def _write(self, data, timeout=3): + """ + Write data to the serial port and consume the echoed response + + The radio echos the data it is sent before replying. Send the + data to the radio, consume the reply, and ensure that the reply + is the same as the data sent. + """ + + serial = self.pipe + expected = len(data) + resp = b'' + start = datetime.now() + + # LOG.debug('WRITE(%02d): %s', expected, util.hexprint(data).rstrip()) + serial.write(data) + while True: + if not expected: + break + rbytes = serial.read(expected) + resp += rbytes + expected -= len(rbytes) + if (datetime.now() - start).seconds > timeout: + raise errors.RadioError('Timeout while reading from radio') + if resp != data: + raise errors.RadioError('Echoed response did not match sent data') + + def _read(self, length, timeout=3): + """Read data from the serial port""" + + resp = b'' + serial = self.pipe + remaining = length + start = datetime.now() + + if not remaining: + return resp + + while True: + rbytes = serial.read(remaining) + resp += rbytes + remaining -= len(rbytes) + if not remaining: + break + if (datetime.now() - start).seconds > timeout: + raise errors.RadioError('Timeout while reading from radio') + time.sleep(0.1) + + # LOG.debug('READ(%02d): %s', length, util.hexprint(resp).rstrip()) + return resp + + def _read_block(self, block_addr, block_size): + + LOG.debug('Reading block %04x...', block_addr) + cmd = struct.pack('>cHb', self.CMD_READ, block_addr, + block_size) + resp_prefix = self.CMD_WRITE + cmd[1:] + + try: + msg = ('Failed to write command to radio for block ' + 'read at %04x' % block_addr) + self._write(cmd) + + msg = ('Failed to read response from radio for block ' + 'read at %04x' % block_addr) + response = self._read(len(cmd) + block_size) + + if response[:len(cmd)] != resp_prefix: + raise errors.RadioError('Error reading block %04x, ' + 'Command not returned.' % (block_addr)) + + msg = ('Failed to write ACK to radio after block read at ' + '%04x' % block_addr) + self._write(self.CMD_ACK) + + msg = ('Failed to read ACK from radio after block read at ' + '%04x' % block_addr) + ack = self._read(1) + except Exception: + LOG.debug(msg, exc_info=True) + raise errors.RadioError(msg) + + if ack != self.CMD_ACK: + raise errors.RadioError('No ACK reading block ' + '%04x.' % (block_addr)) + + return response[len(cmd):] + + def _write_block(self, block_addr, block_size): + + cmd = struct.pack('>cHb', self.CMD_WRITE, block_addr, block_size) + data = self.get_mmap()[block_addr:block_addr + 8] + + LOG.debug('Writing Data:\n%s%s', + util.hexprint(cmd), util.hexprint(data)) + + try: + self._write(cmd + data) + if self._read(1) != self.CMD_ACK: + raise Exception('No ACK') + except Exception: + msg = 'Failed to send block to radio at %04x' % block_addr + LOG.debug(msg, exc_info=True) + raise errors.RadioError(msg) + + def _enter_programming_mode(self): + + LOG.debug('Entering programming mode') + try: + msg = 'Error communicating with radio entering programming mode.' + self._write(self.CMD_PROGRAM_ENTER) + time.sleep(0.5) + ack = self._read(1) + + if not ack: + raise errors.RadioError('No response from radio') + elif ack != self.CMD_ACK: + raise errors.RadioError('Radio refused to enter ' + 'programming mode') + + msg = 'Error communicating with radio during identification' + self._write(self.CMD_IDENTIFY) + ident = self._read(8) + + if not ident.startswith('SMP558'): + LOG.debug(util.hexprint(ident)) + raise errors.RadioError('Radio returned unknown ID string') + + msg = ('Error communicating with radio while querying ' + 'model identifier') + self._write(self.CMD_ACK) + + msg = 'Error communicating with radio on final handshake' + ack = self._read(1) + + if ack != self.CMD_ACK: + raise errors.RadioError('Radio refused to enter programming ' + 'mode failed on final handshake.') + except Exception: + LOG.debug(msg, exc_info=True) + raise errors.RadioError(msg) + + def _exit_programming_mode(self): + try: + self._write(self.CMD_PROGRAM_EXIT) + except Exception: + msg = 'Radio refused to exit programming mode' + LOG.debug(msg, exc_info=True) + raise errors.RadioError(msg) + LOG.debug('Exited programming mode')