Attached is the radio image for testing.
Regards, Robert Jennings
On 09/05/2018 03:16 PM, Craigslist Reply wrote:
# HG changeset patch # User Robert C Jennings rcj4747@gmail.com # Date 1535818178 18000 # Sat Sep 01 11:09:38 2018 -0500 # Node ID 0a65ceee60f5ff019c954536d2fb1162428945e0 # Parent 4873d5437a583c3a1b169808c3d29a53524bc5b2 [boblov_x3plus] Add Boblov X3Plus radio Fixes #6073
Add support for this motorcycle/bicycle helmet radio. It operates in the EU PMR446 and FRS/GMRS frequencies for certain, but also appears to support 70cm band work (untested).
diff --git a/chirp/drivers/boblov_x3plus.py b/chirp/drivers/boblov_x3plus.py new file mode 100644 --- /dev/null +++ b/chirp/drivers/boblov_x3plus.py @@ -0,0 +1,572 @@ +""" +Radio driver for the Boblov X3 Plus Motorcycle Helmet Radio +""" +# Copyright 2018 Robert C Jennings rcj4747@gmail.com +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see http://www.gnu.org/licenses/.
+import logging +import struct +import time
+from datetime import datetime +from textwrap import dedent
+from chirp import (
- bitwise,
- chirp_common,
- directory,
- errors,
- memmap,
- util,
+) +from chirp.settings import (
- RadioSetting,
- RadioSettingGroup,
- RadioSettings,
- RadioSettingValueBoolean,
- RadioSettingValueInteger,
- RadioSettingValueList,
+)
+LOG = logging.getLogger(__name__)
+@directory.register +class BoblovX3Plus(chirp_common.CloneModeRadio,
chirp_common.ExperimentalRadio):- """Boblov X3 Plus motorcycle/cycling helmet radio"""
- VENDOR = 'Boblov'
- MODEL = 'X3Plus'
- BAUD_RATE = 9600
- CHANNELS = 16
- MEM_FORMAT = """
- #seekto 0x0010;
- struct {
lbcd rxfreq[4];lbcd txfreq[4];lbcd rxtone[2];lbcd txtone[2];u8 unknown1:1,compander:1,scramble:1,skip:1,highpower:1,narrow:1,unknown2:1,bcl:1;u8 unknown3[3];- } memory[16];
- #seekto 0x03C0;
- struct {
u8 unknown1:4,voiceprompt:2,batterysaver:1,beep:1;u8 squelchlevel;u8 unknown2;u8 timeouttimer;u8 voxlevel;u8 unknown3;u8 unknown4;u8 voxdelay;- } settings;
- """
- # Radio command data
- CMD_ACK = '\x06'
- CMD_IDENTIFY = '\x02'
- CMD_PROGRAM_ENTER = '.VKOGRAM'
- CMD_PROGRAM_EXIT = '\x62' # 'b'
- CMD_READ = 'R'
- CMD_WRITE = 'W'
- BLOCK_SIZE = 0x08
- VOICE_LIST = ['Off', 'Chinese', 'English']
- TIMEOUTTIMER_LIST = ['Off', '30 seconds', '60 seconds', '90 seconds',
'120 seconds', '150 seconds', '180 seconds','210 seconds', '240 seconds', '270 seconds','300 seconds']- VOXLEVEL_LIST = ['Off', '1', '2', '3', '4', '5', '6', '7', '8', '9']
- VOXDELAY_LIST = ['1 seconds', '2 seconds',
'3 seconds', '4 seconds', '5 seconds']- X3P_POWER_LEVELS = [chirp_common.PowerLevel('Low', watts=0.5),
chirp_common.PowerLevel('High', watts=2.00)]- _memsize = 0x03F0
- _ranges = [
(0x0000, 0x03F0),- ]
- @classmethod
- def get_prompts(cls):
rp = chirp_common.RadioPrompts()rp.experimental = _(dedent("""\The X3Plus driver is currently experimental.There are no known issues but you should proceed with caution.Please save an unedited copy of your first successfuldownload to a CHIRP Radio Images (*.img) file."""))return rp- @classmethod
- def match_model(cls, filedata, filename):
"""Given contents of a stored file (@filedata), return True ifthis radio driver handles the represented model"""if len(filedata) != cls._memsize:LOG.debug('Boblov_x3plus: match_model: size mismatch')return FalseLOG.debug('Boblov_x3plus: match_model: size matches')if 'P310' in filedata[0x03D0:0x03D8]:LOG.debug('Boblov_x3plus: match_model: radio ID matches')return TrueLOG.debug('Boblov_x3plus: match_model: no radio ID match')return False- def get_features(self):
"""Return a RadioFeatures object for this radio"""rf = chirp_common.RadioFeatures()rf.has_settings = Truerf.valid_modes = ['NFM', 'FM'] # 12.5 KHz, 25 kHz.rf.valid_power_levels = self.X3P_POWER_LEVELSrf.valid_skips = ['', 'S']rf.valid_tmodes = ['', 'Tone', 'TSQL', 'DTCS', 'Cross']rf.valid_duplexes = ['', '-', '+', 'split', 'off']rf.can_odd_split = Truerf.has_rx_dtcs = Truerf.has_ctone = Truerf.has_cross = Truerf.valid_cross_modes = ['Tone->Tone','DTCS->','->DTCS','Tone->DTCS','DTCS->Tone','->Tone','DTCS->DTCS']rf.has_tuning_step = Falserf.has_bank = Falserf.has_name = Falserf.memory_bounds = (1, self.CHANNELS)rf.valid_bands = [(400000000, 470000000)]return rf- def process_mmap(self):
"""Process a newly-loaded or downloaded memory map"""self._memobj = bitwise.parse(self.MEM_FORMAT, self._mmap)- def sync_in(self):
"Initiate a radio-to-PC clone operation"LOG.debug('Cloning from radio')status = chirp_common.Status()status.msg = 'Cloning from radio'status.cur = 0status.max = self._memsizeself.status_fn(status)self._enter_programming_mode()data = ''for addr in range(0, self._memsize, self.BLOCK_SIZE):status.cur = addr + self.BLOCK_SIZEself.status_fn(status)block = self._read_block(addr, self.BLOCK_SIZE)data += blockLOG.debug('Address: %04x', addr)LOG.debug(util.hexprint(block))self._exit_programming_mode()self._mmap = memmap.MemoryMap(data)self.process_mmap()- def sync_out(self):
"Initiate a PC-to-radio clone operation"LOG.debug('Upload to radio')status = chirp_common.Status()status.msg = 'Uploading to radio'status.cur = 0status.max = self._memsizeself.status_fn(status)self._enter_programming_mode()for start_addr, end_addr in self._ranges:for addr in range(start_addr, end_addr, self.BLOCK_SIZE):status.cur = addr + self.BLOCK_SIZEself.status_fn(status)self._write_block(addr, self.BLOCK_SIZE)self._exit_programming_mode()- def get_raw_memory(self, number):
"""Return a raw string describing the memory at @number"""return repr(self._memobj.memory[number - 1])- @staticmethod
- def _decode_tone(val):
val = int(val)if val == 16665:return '', None, Noneelif val >= 12000:return 'DTCS', val - 12000, 'R'elif val >= 8000:return 'DTCS', val - 8000, 'N'return 'Tone', val / 10.0, None- @staticmethod
- def _encode_tone(memval, mode, value, pol):
if mode == '':memval[0].set_raw(0xFF)memval[1].set_raw(0xFF)elif mode == 'Tone':memval.set_value(int(value * 10))elif mode == 'DTCS':flag = 0x80 if pol == 'N' else 0xC0memval.set_value(value)memval[1].set_bits(flag)else:raise Exception('Internal error: invalid mode `%s`' % mode)- def get_memory(self, number):
"""Return a Memory object for the memory at location @number"""try:rmem = self._memobj.memory[number - 1]except KeyError:raise errors.InvalidMemoryLocation('Unknown channel %s' %number)
if number < 1 or number > self.CHANNELS:raise errors.InvalidMemoryLocation('Channel number must be 1 and %s' % self.CHANNELS)mem = chirp_common.Memory()mem.number = numbermem.freq = int(rmem.rxfreq) * 10# A blank (0MHz) or 0xFFFFFFFF frequency is considered emptyif mem.freq == 0 and rmem.rxfreq.get_raw() == '\xFF\xFF\xFF\xFF':LOG.debug('empty channel %d', number)mem.freq = 0mem.empty = Truereturn memif rmem.txfreq.get_raw() == '\xFF\xFF\xFF\xFF':mem.duplex = 'off'mem.offset = 0elif int(rmem.rxfreq) == int(rmem.txfreq):mem.duplex = ''mem.offset = 0else:mem.duplex = '-' if int(rmem.rxfreq) > int(rmem.txfreq)else '+'
mem.offset = abs(int(rmem.rxfreq) - int(rmem.txfreq)) * 10mem.mode = 'NFM' if rmem.narrow else 'FM'mem.skip = 'S' if rmem.skip else ''mem.power = self.X3P_POWER_LEVELS[rmem.highpower]txtone = self._decode_tone(rmem.txtone)rxtone = self._decode_tone(rmem.rxtone)chirp_common.split_tone_decode(mem, txtone, rxtone)mem.extra = RadioSettingGroup('Extra', 'extra')mem.extra.append(RadioSetting('bcl', 'Busy Channel Lockout',RadioSettingValueBoolean(current=(not rmem.bcl))))mem.extra.append(RadioSetting('scramble', 'Scramble',RadioSettingValueBoolean(current=(not rmem.scramble))))mem.extra.append(RadioSetting('compander', 'Compander',RadioSettingValueBoolean(current=(not rmem.compander))))return mem- def set_memory(self, memory):
"""Set the memory object @memory"""rmem = self._memobj.memory[memory.number - 1]if memory.empty:rmem.set_raw('\xFF' * (rmem.size() / 8))returnrmem.rxfreq = memory.freq / 10set_txtone = Trueif memory.duplex == 'off':for i in range(0, 4):rmem.txfreq[i].set_raw('\xFF')# If recieve only then txtone value should be noneself._encode_tone(rmem.txtone, mode='', value=None,pol=None)
set_txtone = Falseelif memory.duplex == 'split':rmem.txfreq = memory.offset / 10elif memory.duplex == '+':rmem.txfreq = (memory.freq + memory.offset) / 10elif memory.duplex == '-':rmem.txfreq = (memory.freq - memory.offset) / 10else:rmem.txfreq = memory.freq / 10txtone, rxtone = chirp_common.split_tone_encode(memory)if set_txtone:self._encode_tone(rmem.txtone, *txtone)self._encode_tone(rmem.rxtone, *rxtone)rmem.narrow = 'N' in memory.modermem.skip = memory.skip == 'S'for setting in memory.extra:# NOTE: Only three settings right now, all are invertedsetattr(rmem, setting.get_name(), not int(setting.value))- def get_settings(self):
"""Return a RadioSettings list containing one or moreRadioSettingGroup
or RadioSetting objects. These represent general settings that canbe adjusted on the radio."""cur = self._memobj.settingsbasic = RadioSettingGroup('basic', 'Basic Settings')rs = RadioSetting('squelchlevel', 'Squelch level',RadioSettingValueInteger(minval=0, maxval=9,current=cur.squelchlevel))basic.append(rs)rs = RadioSetting('timeouttimer', 'Timeout timer',RadioSettingValueList(options=self.TIMEOUTTIMER_LIST,current=self.TIMEOUTTIMER_LIST[cur.timeouttimer]))
basic.append(rs)rs = RadioSetting('voiceprompt', 'Voice prompt',RadioSettingValueList(options=self.VOICE_LIST,current=self.VOICE_LIST[cur.voiceprompt]))basic.append(rs)rs = RadioSetting('voxlevel', 'Vox level',RadioSettingValueList(options=self.VOXLEVEL_LIST,current=self.VOXLEVEL_LIST[cur.voxlevel]))basic.append(rs)rs = RadioSetting('voxdelay', 'VOX delay',RadioSettingValueList(options=self.VOXDELAY_LIST,current=self.VOXDELAY_LIST[cur.voxdelay]))basic.append(rs)basic.append(RadioSetting('batterysaver', 'Battery saver',RadioSettingValueBoolean(current=cur.batterysaver)))basic.append(RadioSetting('beep', 'Beep',RadioSettingValueBoolean(current=cur.beep)))return RadioSettings(basic)- def set_settings(self, settings):
"""Accepts the top-level RadioSettingGroup returned fromget_settings() and adjusts the values in the radio accordingly.This function expects the entire RadioSettingGroup hierarchyreturned from get_settings()."""for element in settings:if not isinstance(element, RadioSetting):self.set_settings(element)continueelse:try:if '.' in element.get_name():bits = element.get_name().split('.')obj = self._memobjfor bit in bits[:-1]:obj = getattr(obj, bit)setting = bits[-1]else:obj = self._memobj.settingssetting = element.get_name()if element.has_apply_callback():LOG.debug('Using apply callback')element.run_apply_callback()else:LOG.debug('Setting %s = %s', setting,element.value)
setattr(obj, setting, element.value)except Exception:LOG.debug(element.get_name())raise- def _write(self, data, timeout=3):
"""Write data to the serial port and consume the echoed responseThe radio echos the data it is sent before replying. Send thedata to the radio, consume the reply, and ensure that the replyis the same as the data sent."""serial = self.pipeexpected = len(data)resp = b''start = datetime.now()# LOG.debug('WRITE(%02d): %s', expected,util.hexprint(data).rstrip())
serial.write(data)while True:if not expected:breakrbytes = serial.read(expected)resp += rbytesexpected -= len(rbytes)if (datetime.now() - start).seconds > timeout:raise errors.RadioError('Timeout while reading from radio')if resp != data:raise errors.RadioError('Echoed response did not match sentdata')
- def _read(self, length, timeout=3):
"""Read data from the serial port"""resp = b''serial = self.piperemaining = lengthstart = datetime.now()if not remaining:return respwhile True:rbytes = serial.read(remaining)resp += rbytesremaining -= len(rbytes)if not remaining:breakif (datetime.now() - start).seconds > timeout:raise errors.RadioError('Timeout while reading from radio')time.sleep(0.1)# LOG.debug('READ(%02d): %s', length,util.hexprint(resp).rstrip())
return resp- def _read_block(self, block_addr, block_size):
LOG.debug('Reading block %04x...', block_addr)cmd = struct.pack('>cHb', self.CMD_READ, block_addr,block_size)resp_prefix = self.CMD_WRITE + cmd[1:]try:msg = ('Failed to write command to radio for block ''read at %04x' % block_addr)self._write(cmd)msg = ('Failed to read response from radio for block ''read at %04x' % block_addr)response = self._read(len(cmd) + block_size)if response[:len(cmd)] != resp_prefix:raise errors.RadioError('Error reading block %04x, ''Command not returned.' %(block_addr))
msg = ('Failed to write ACK to radio after block read at ''%04x' % block_addr)self._write(self.CMD_ACK)msg = ('Failed to read ACK from radio after block read at ''%04x' % block_addr)ack = self._read(1)except Exception:LOG.debug(msg, exc_info=True)raise errors.RadioError(msg)if ack != self.CMD_ACK:raise errors.RadioError('No ACK reading block ''%04x.' % (block_addr))return response[len(cmd):]- def _write_block(self, block_addr, block_size):
cmd = struct.pack('>cHb', self.CMD_WRITE, block_addr, block_size)data = self.get_mmap()[block_addr:block_addr + 8]LOG.debug('Writing Data:\n%s%s',util.hexprint(cmd), util.hexprint(data))try:self._write(cmd + data)if self._read(1) != self.CMD_ACK:raise Exception('No ACK')except Exception:msg = 'Failed to send block to radio at %04x' % block_addrLOG.debug(msg, exc_info=True)raise errors.RadioError(msg)- def _enter_programming_mode(self):
LOG.debug('Entering programming mode')try:msg = 'Error communicating with radio entering programmingmode.'
self._write(self.CMD_PROGRAM_ENTER)time.sleep(0.5)ack = self._read(1)if not ack:raise errors.RadioError('No response from radio')elif ack != self.CMD_ACK:raise errors.RadioError('Radio refused to enter ''programming mode')msg = 'Error communicating with radio during identification'self._write(self.CMD_IDENTIFY)ident = self._read(8)if not ident.startswith('SMP558'):LOG.debug(util.hexprint(ident))raise errors.RadioError('Radio returned unknown ID string')msg = ('Error communicating with radio while querying ''model identifier')self._write(self.CMD_ACK)msg = 'Error communicating with radio on final handshake'ack = self._read(1)if ack != self.CMD_ACK:raise errors.RadioError('Radio refused to enterprogramming '
'mode failed on final handshake.')except Exception:LOG.debug(msg, exc_info=True)raise errors.RadioError(msg)- def _exit_programming_mode(self):
try:self._write(self.CMD_PROGRAM_EXIT)except Exception:msg = 'Radio refused to exit programming mode'LOG.debug(msg, exc_info=True)raise errors.RadioError(msg)LOG.debug('Exited programming mode')