# HG changeset patch # User Angus Ainslie angus@akkea.ca # Date 1592188302 25200 # Sun Jun 14 19:31:42 2020 -0700 # Branch py3 # Node ID eb35a46c5f7e43518f6adc88768107efc0a3d402 # Parent b1e01577ff2505de7fcca665bf3d6af296f0f7ca [thd74] Add a Kenwood d74 driver
Based on code from
Tom Hayward, Eric Wolak, William McKeehan
https://chirp.danplanet.com/issues/4129
This works with python3 no idea about python2
diff --git a/chirp/drivers/thd74.py b/chirp/drivers/thd74.py new file mode 100644 --- /dev/null +++ b/chirp/drivers/thd74.py @@ -0,0 +1,571 @@ +import logging +import struct +import binascii + +import time + +from chirp import directory, bitwise, errors, chirp_common, memmap +from chirp.settings import RadioSettingGroup, RadioSetting, RadioSettings, \ + RadioSettingValueInteger, RadioSettingValueString, \ + RadioSettingValueList, RadioSettingValueBoolean, \ + InvalidValueError + +from . import thd72 +from chirp.util import hexprint + +LOG = logging.getLogger(__name__) + +# Save files from MCP-D74 have a 256-byte header, and possibly some oddness +# TH-D74 memory map + +# 0x02000: memory flags, 4 bytes per memory +# 0x04000: memories, each 40 bytes long +# 0x10000: names, each 16 bytes long, null padded, ascii encoded + +# memory channel +# 0 1 2 3 4 5 6 7 8 9 a b c d e f +# [freq ] ? mode tmode/duplex rtone ctone dtcs cross_mode [offset] ? + +# frequency is 32-bit unsigned little-endian Hz + +DEFAULT_PROG_VFO = ( + (136000000, 174000000), + (410000000, 470000000), + (118000000, 136000000), + (136000000, 174000000), + (320000000, 400000000), + (400000000, 524000000), +) + +# Some of the blocks written by MCP have a length set of less than 0x00/256 +BLOCK_SIZES = { + 0x0003: 0x00B4, + 0x0007: 0x0068, +} + +mem_format = """ +// TODO: find lockout + +#seekto 0x10c0; +struct { + char power_on_msg[16]; + char modem_name[16]; +} onmsg_name; + +#seekto 0x1200; +struct { + char callsign[8]; +} callsign; + +#seekto 0x02000; +struct { +// 4 bytes long + u8 disabled; + u8 unk; + u8 group; + u8 unk2; +} flag[1032]; + +#seekto 0x04000; +// TODO: deal with the 16-byte trailers of every block +struct { + struct { + ul32 freq; + ul32 offset; + + u8 tuning_step:4, + unk:4; + u8 mode:4, + unk1:4; + u8 tone_mode:4, + duplex:4; + u8 rtone; + + u8 ctone; + u8 dtcs; + u8 cross_mode:4 + digital_squelch:4; + char urcall[8]; + char rpt1[8]; + char rpt2[8]; + + u8 digital_squelch_code; + + } mem[6]; + + u8 pad[16]; +} memory[1167]; // TODO: correct number of memories + +#seekto 0x10000; +struct { + char name[16]; +} channel_name[1000]; + +#seekto 0x14700; +struct { + char name[16]; +} wx_name[10]; + +#seekto 0x144d0; +struct { + char name[16]; +} call_name[6]; + +#seekto 0x14800; +struct { + char name[16]; +} group_name[31]; +""" + +STEPS = [5.0, 6.25, None, None, 10.0, 12.5, 15.0, 20.0, 25.0, 50.0, 100.0, 9.0] +MODES = [ + "FM", + "DV", + "AM", + "LSB", + "USB", + "CW", + "NFM", + "DV" +] + +def hex(data): + data_txt = "" + for idx in range(0, len(data), 16): + bytes = binascii.hexlify(str(data[idx:idx+16]).encode('utf8')).upper() + for idx in range(0, len(bytes), 2): + data_txt += str(bytes[idx:idx+2]) + " " + data_txt += "\n" + return data_txt.strip() + +class SProxy(object): + def __init__(self, delegate): + self.delegate = delegate + + def read(self, len): + r = self.delegate.read(len) + LOG.debug("READ\n" + hex(r)) + return r + + def write(self, data): + LOG.debug("WRITE\n" + hex(data)) + return self.delegate.write(str(data)) + + @property + def timeout(self): + return self.delegate.timeout + + @timeout.setter + def timeout(self, timeout): + self.delegate.timeout = timeout + + + +@directory.register +class THD74Radio(thd72.THD72Radio): + MODEL = "TH-D74 (clone mode)" + #MODEL = "TH-D74" + _memsize = 500480 + # I think baud rate might be ignored by USB-Serial stack of the D74's + # on-board FTDI chip, but it doesn't seem to hurt. + BAUD_RATE = 115200 + + + #def __init__(self, pipe): + # pipe = SProxy(pipe) + # super(THD74Radio, self).__init__(pipe) + + def get_features(self): + rf = super(THD74Radio, self).get_features() + rf.has_tuning_step = True + return rf + + def process_mmap(self): + self._memobj = bitwise.parse(mem_format, self._mmap) + self._dirty_blocks = [] + + def sync_in(self): + # self._detect_baud() + self._mmap = self.download() + self.process_mmap() + + def sync_out(self): + if len(self._dirty_blocks): + self.upload(self._dirty_blocks) + else: + self.upload() + + def read_block(self, block, count=256): + cmd = struct.pack(">cHH", b"R", block, count%256) + print( "Read cmd %s" % cmd ) + self.pipe.write(''.join(chr(b) for b in cmd)) + + r = self.pipe.read(5) + if len(r) != 5: + raise Exception("Did not receive block response") + + print( "Read input %s %i %i %i %i" % ( r, ord(r[1]), ord(r[2]), ord(r[3]), ord(r[4] ))) + + #cmd, _block, _ = struct.unpack(">cHH", b''.join(ord(b) for b in r)) + cmd = r[0] + _block = (ord(r[1]) << 8) + ord(r[2]) + if cmd != 'W' or _block != block: + raise Exception("Invalid response: %s %i %i" % (cmd, block, _block)) + + data = "" + while len(data) < count: + data += self.pipe.read(count - len(data)) + + self.pipe.write(chr(0x06)) + if self.pipe.read(1) != chr(0x06): + raise Exception("Did not receive post-block ACK!") + + return data + + def write_block(self, block, map, count=256): + #print("Write block ", block ) + c = struct.pack(">cHH", b"W", block, count%256) + base = block * 256 + data = map[base:base + count] + # It's crucial that these are written together. Otherwise the radio + # will fail to ACK under some conditions. + c_d = ''.join(chr(b) for b in c) + data + self.pipe.write(c_d) + + ack = self.pipe.read(1) + + if len(ack) == 0: + print("read timed out block %d - trying again" % block ) + time.sleep(0.5) + ack = self.pipe.read(1) + + if ack != chr(0x06): + print("Block %d write failed %d" % ( block, ord(ack))) + + return ack == chr(0x06) + + def _unlock(self): + """Voodoo sequence of operations to get the radio to accept our programming.""" + + h = self.read_block(0, 6) + + unlock = ("\x57\x00\x00\x00\x30\xff\x01\xff\x00\x00\xff\xff\xff\xff\xff\x01" + + "\x00\x00\x00\x03\x01\x00\x00\x00\x00\x02\x00\x30\x30\x30\x00\xff" + + "\xff\xff\xff\xff\xff\xff\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff" + + "\xff\xff\xff\xff\xff") + + self.pipe.write(unlock) + + ack = self.pipe.read(1) + + if ack != chr(0x06): + raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack))) + + c = struct.pack(">cHH", b"W", 0, 0x38C8) + self.pipe.write(''.join(chr(b) for b in c)) + # magic unlock sequence + unlock = [0xFF] * 8 + [0] * 160 + [0xFF] * 32 + unlock = "".join([chr(x) for x in unlock]) + self.pipe.write(unlock) + + time.sleep(0.01) + ack = self.pipe.read(1) + + if ack != chr(0x06): + raise errors.RadioError("Expected ack but got {} ({})".format(ord(ack), type(ack))) + + def download(self, raw=False, blocks=None): + if blocks is None: + blocks = list(range(int(self._memsize / 256))) + else: + blocks = [b for b in blocks if b < int(self._memsize / 256)] + + if self.command("0M PROGRAM", 2, timeout=1.5) != "0M": + raise errors.RadioError("Radio didn't go into PROGRAM mode") + + allblocks = list(range(int(self._memsize / 256))) + self.pipe.baudrate = 57600 + try: + self.pipe.setRTS() + except AttributeError: + self.pipe.rts = True + self.pipe.read(1) + data = "" + LOG.debug("reading blocks %d..%d" % (blocks[0], blocks[-1])) + total = len(blocks) + count = 0 + for i in allblocks: + if i not in blocks: + data += 256 * '\xff' + continue + data += self.read_block(i) + count += 1 + if self.status_fn: + s = chirp_common.Status() + s.msg = "Cloning from radio" + s.max = total + s.cur = count + self.status_fn(s) + + + self.pipe.write("E") + if raw: + return data + return memmap.MemoryMap(data) + + def upload(self, blocks=None): + # MCP-D74 sets DTR, so we should too + try: + self.pipe.setDTR() + except AttributeError: + self.pipe.dtr = True + + if blocks is None: + blocks = list(range((int(self._memsize / 256)) - 2)) + else: + blocks = [b for b in blocks if b < int(self._memsize / 256)] + + if self.command("0M PROGRAM", 2, timeout=1.5) != "0M": + raise errors.RadioError("Radio didn't go into PROGRAM mode") + + if self._unlock(): + raise errors.RadioError("Unlock failed") + + # This block definitely isn't written conventionally, so we let _unlock + # handle it and skip. + if 0 in blocks: + blocks.remove(0) + + # For some reason MCP-D74 skips this block. If we don't, we'll get a NACK + # on the next one. There is also a more than 500 ms delay for the ACK. + if 1279 in blocks: + blocks.remove(1279) + + print("writing blocks %d..%d" % (blocks[0], blocks[-1])) + total = len(blocks) + count = 0 + for i in blocks: + time.sleep(0.001) + r = self.write_block(i, self._mmap, BLOCK_SIZES.get(i, 256)) + count += 1 + if not r: + raise errors.RadioError("write of block %i failed" % i) + if self.status_fn: + s = chirp_common.Status() + s.msg = "Cloning to radio" + s.max = total + s.cur = count + self.status_fn(s) + + lock = ("\x57\x00\x00\x00\x06\x02\x01\xff\x00\x00\xff") + self.pipe.write(lock) + + self.pipe.write("F") + # clear out blocks we uploaded from the dirty blocks list + self._dirty_blocks = [b for b in self._dirty_blocks if b not in blocks] + + def command(self, cmd, response_length, timeout=0.5): + start = time.time() + + LOG.debug("PC->D72: %s" % cmd) + default_timeout = self.pipe.timeout + self.pipe.write(cmd + "\r") + self.pipe.timeout = timeout + try: + data = self.pipe.read(response_length + 1) + LOG.debug("D72->PC: %s" % data.strip()) + finally: + self.pipe.timeout = default_timeout + return data.strip() + + def get_raw_memory(self, number): + bank = number // 6 + idx = number % 6 + + _mem = self._memobj.memory[bank].mem[idx] + return repr(_mem) + \ + repr(self._memobj.flag[number]) + + def get_id(self): + r = self.command("ID", 9) + if r.startswith("ID "): + return r.split(" ")[1] + else: + raise errors.RadioError("No response to ID command") + + def set_channel_name(self, number, name): + name = name[:16] + '\x00' * 16 + if number < 999: + self._memobj.channel_name[number].name = name[:16] + self.add_dirty_block(self._memobj.channel_name[number]) + elif number >= 1020 and number < 1030: + number -= 1020 + self._memobj.wx_name[number].name = name[:16] + self.add_dirty_block(self._memobj.wx_name[number]) + + def get_memory(self, number): + if isinstance(number, str): + try: + number = thd72.THD72_SPECIAL[number] + except KeyError: + raise errors.InvalidMemoryLocation("Unknown channel %s" % + number) + + if number < 0 or number > (max(thd72.THD72_SPECIAL.values()) + 1): + raise errors.InvalidMemoryLocation( + "Number must be between 0 and 999") + + bank = number // 6 + idx = number % 6 + + #print("reading memory #%d bank %d entry %d" %(number, bank, idx)) + _mem = self._memobj.memory[bank].mem[idx] + flag = self._memobj.flag[number] + + #print("Memory mode %d" % _mem.mode) + if _mem.mode < len( MODES ) and MODES[_mem.mode] == "DV": + mem = chirp_common.DVMemory() + else: + mem = chirp_common.Memory() + + mem.number = number + + if number > 999: + mem.extd_number = thd72.THD72_SPECIAL_REV[number] + if flag.disabled == 0xFF: + mem.empty = True + return mem + + mem.name = self.get_channel_name(number) + mem.freq = int(_mem.freq) + mem.tmode = thd72.TMODES[int(_mem.tone_mode)] + mem.rtone = chirp_common.TONES[_mem.rtone] + mem.ctone = chirp_common.TONES[_mem.ctone] + mem.dtcs = chirp_common.DTCS_CODES[_mem.dtcs] + mem.duplex = thd72.DUPLEX[int(_mem.duplex)] + mem.offset = _mem.offset + mem.mode = MODES[int(_mem.mode)] + mem.tuning_step = STEPS[_mem.tuning_step] + + if mem.mode == "DV": + mem.dv_urcall = _mem.urcall + mem.dv_rpt1call = _mem.rpt1 + mem.dv_rpt2call = _mem.rpt2 + mem.dv_code = _mem.digital_squelch_code + + if number < 999: + # mem.skip = chirp_common.SKIP_VALUES[int(flag.skip)] + mem.cross_mode = chirp_common.CROSS_MODES[_mem.cross_mode] + if number > 999: + mem.cross_mode = chirp_common.CROSS_MODES[0] + mem.immutable = ["number", "bank", "extd_number", "cross_mode"] + if number >= 1020 and number < 1030: + mem.immutable += ["freq", "offset", "tone", "mode", + "tmode", "ctone", "skip"] # FIXME: ALL + else: + mem.immutable += ["name"] + + return mem + + def set_memory(self, mem): + LOG.debug("set_memory(%d)" % mem.number) + if mem.number < 0 or mem.number > (max(thd72.THD72_SPECIAL.values()) + 1): + raise errors.InvalidMemoryLocation( + "Number must be between 0 and 999") + + # weather channels can only change name, nothing else + if mem.number >= 1020 and mem.number < 1030: + self.set_channel_name(mem.number, mem.name) + return + + flag = self._memobj.flag[mem.number] + self.add_dirty_block(self._memobj.flag[mem.number]) + + # only delete non-WX channels + was_empty = flag.disabled == 0xf + if mem.empty: + flag.disabled = 0xf + return + flag.disabled = 0 + + bank = mem.number // 6 + idx = mem.number % 6 + + print("seting memory #%d bank %d entry %d" %(mem.number, bank, idx)) + _mem = self._memobj.memory[bank].mem[idx] + self.add_dirty_block(_mem) + if was_empty: + self.initialize(_mem) + + _mem.freq = mem.freq + + if mem.number < 999: + self.set_channel_name(mem.number, mem.name) + + _mem.tone_mode = thd72.TMODES_REV[mem.tmode] + _mem.rtone = chirp_common.TONES.index(mem.rtone) + _mem.ctone = chirp_common.TONES.index(mem.ctone) + _mem.dtcs = chirp_common.DTCS_CODES.index(mem.dtcs) + _mem.cross_mode = chirp_common.CROSS_MODES.index(mem.cross_mode) + _mem.duplex = thd72.DUPLEX_REV[mem.duplex] + _mem.offset = mem.offset + _mem.mode = thd72.MODES_REV[mem.mode] + + prog_vfo = thd72.get_prog_vfo(mem.freq) + #flag.prog_vfo = prog_vfo + + #if mem.number < 999: + # flag.skip = chirp_common.SKIP_VALUES.index(mem.skip) + + + @staticmethod + def _add_00_pad(val, length): + return val.ljust(length, "\x00")[:length] + + + @classmethod + def apply_callsign(cls, setting, obj): + callsign = setting.value.get_value().upper() + setattr(obj, "callsign", cls._add_00_pad(callsign, 8)) + + + @classmethod + def apply_power_on_msg(cls, setting, obj): + msg = setting.value.get_value() + setattr(obj, "power_on_msg", cls._add_00_pad(msg, 16)) + + + def _get_general_settings(self): + menu = RadioSettingGroup("general", "General") + cs = self._memobj.callsign + + val = RadioSettingValueString( + 0, 6, str(cs.callsign).rstrip("\x00")) + rs = RadioSetting("cs.callsign", "Callsign", val) + rs.set_apply_callback(self.apply_callsign, cs) + menu.append(rs) + + msg = self._memobj.onmsg_name + + val = RadioSettingValueString( + 0, 16, str(msg.power_on_msg).rstrip("\x00")) + rs = RadioSetting("msg.power_on_msg", "Power on message", val) + rs.set_apply_callback(self.apply_power_on_msg, msg) + menu.append(rs) + + return menu + + + def _get_settings(self): + top = RadioSettings(self._get_general_settings()) + return top + + + def get_settings(self): + try: + return self._get_settings() + except: + import traceback + LOG.error("Failed to parse settings: %s", traceback.format_exc()) + return None +