# HG changeset patch # User Rhett Robinson rrhett@gmail.com # Date 1515970948 28800 # Sun Jan 14 15:02:28 2018 -0800 # Node ID ebcb6aeda1b85ef516247e69b21c24d33cd1d423 # Parent 16dc67ee13d18c5645c0bdbe0d20d021da702492 Adds support for a raw-mode Icom driver. This is required for the IC-2730A (part of #2745).
diff -r 16dc67ee13d1 -r ebcb6aeda1b8 chirp/drivers/icf.py --- a/chirp/drivers/icf.py Wed Jan 03 11:06:05 2018 -0500 +++ b/chirp/drivers/icf.py Sun Jan 14 15:02:28 2018 -0800 @@ -85,6 +85,7 @@
def _process_frames(self): if not self.data.startswith("\xFE\xFE"): + LOG.error("Out of sync with radio:\n%s" % util.hexprint(self.data)) raise errors.InvalidDataError("Out of sync with radio") elif len(self.data) < 5: return [] # Not enough data for a full frame @@ -134,11 +135,11 @@ return self._process_frames()
-def get_model_data(pipe, mdata="\x00\x00\x00\x00"): - """Query the radio connected to @pipe for its model data""" - send_clone_frame(pipe, 0xe0, mdata, raw=True) +def get_model_data(radio, mdata="\x00\x00\x00\x00"): + """Query the @radio for its model data""" + send_clone_frame(radio, 0xe0, mdata)
- stream = RadioStream(pipe) + stream = RadioStream(radio.pipe) frames = stream.get_frames()
if len(frames) != 1: @@ -164,27 +165,11 @@ return resp
-def send_clone_frame(pipe, cmd, data, raw=False, checksum=False): - """Send a clone frame with @cmd and @data to the radio attached - to @pipe""" - cs = 0 +def send_clone_frame(radio, cmd, data, checksum=False): + """Send a clone frame with @cmd and @data to the @radio""" + payload = radio.get_payload(data, checksum)
- if raw: - hed = data - else: - hed = "" - for byte in data: - val = ord(byte) - hed += "%02X" % val - cs += val - - if checksum: - cs = ((cs ^ 0xFFFF) + 1) & 0xFF - cs = "%02X" % cs - else: - cs = "" - - frame = "\xfe\xfe\xee\xef%s%s%s\xfd" % (chr(cmd), hed, cs) + frame = "\xfe\xfe\xee\xef%s%s\xfd" % (chr(cmd), payload)
if SAVE_PIPE: LOG.debug("Saving data...") @@ -197,30 +182,14 @@ # return frame pass
- pipe.write(frame) + radio.pipe.write(frame)
return frame
-def process_bcd(bcddata): - """Convert BCD-encoded data to raw""" - data = "" - i = 0 - while i < range(len(bcddata)) and i+1 < len(bcddata): - try: - val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16) - i += 2 - data += struct.pack("B", val) - except ValueError, e: - LOG.error("Failed to parse byte: %s" % e) - break - - return data - - -def process_data_frame(frame, _mmap): +def process_data_frame(radio, frame, _mmap): """Process a data frame, adding the payload to @_mmap""" - _data = process_bcd(frame.payload) + _data = radio.process_frame_payload(frame.payload) if len(_mmap) >= 0x10000: saddr, = struct.unpack(">I", _data[0:4]) length, = struct.unpack("B", _data[4]) @@ -264,7 +233,7 @@
def _clone_from_radio(radio): - md = get_model_data(radio.pipe) + md = get_model_data(radio)
if md[0:4] != radio.get_model(): LOG.info("This model: %s" % util.hexprint(md[0:4])) @@ -274,8 +243,7 @@ if radio.is_hispeed(): start_hispeed_clone(radio, CMD_CLONE_OUT) else: - send_clone_frame(radio.pipe, CMD_CLONE_OUT, - radio.get_model(), raw=True) + send_clone_frame(radio, CMD_CLONE_OUT, radio.get_model())
LOG.debug("Sent clone frame")
@@ -291,7 +259,7 @@
for frame in frames: if frame.cmd == CMD_CLONE_DAT: - src, dst = process_data_frame(frame, _mmap) + src, dst = process_data_frame(radio, frame, _mmap) if last_size != (dst - src): LOG.debug("ICF Size change from %i to %i at %04x" % (last_size, dst - src, src)) @@ -342,7 +310,7 @@ chunk = struct.pack(">HB", i, size) chunk += _mmap[i:i+size]
- send_clone_frame(radio.pipe, + send_clone_frame(radio, CMD_CLONE_DAT, chunk, checksum=True) @@ -360,22 +328,22 @@ # Uncomment to save out a capture of what we actually write to the radio # SAVE_PIPE = file("pipe_capture.log", "w", 0)
- md = get_model_data(radio.pipe) + md = get_model_data(radio)
if md[0:4] != radio.get_model(): raise errors.RadioError("I can't talk to this model")
# This mimics what the Icom software does, but isn't required and just # takes longer - # md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00") - # md = get_model_data(radio.pipe, model=md[0:2]+"\x00\x00") + # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00") + # md = get_model_data(radio, mdata=md[0:2]+"\x00\x00")
stream = RadioStream(radio.pipe)
if radio.is_hispeed(): start_hispeed_clone(radio, CMD_CLONE_IN) else: - send_clone_frame(radio.pipe, CMD_CLONE_IN, radio.get_model(), raw=True) + send_clone_frame(radio, CMD_CLONE_IN, radio.get_model())
frames = []
@@ -384,7 +352,7 @@ break frames += stream.get_frames()
- send_clone_frame(radio.pipe, CMD_CLONE_END, radio.get_endframe(), raw=True) + send_clone_frame(radio, CMD_CLONE_END, radio.get_endframe())
if SAVE_PIPE: SAVE_PIPE.close() @@ -409,6 +377,7 @@ try: return _clone_to_radio(radio) except Exception, e: + logging.exception("Failed to communicate with the radio") raise errors.RadioError("Failed to communicate with the radio: %s" % e)
@@ -579,6 +548,13 @@ raise errors.RadioError("Out of slots in this bank")
+def compute_checksum(data): + cs = 0 + for byte in data: + cs += ord(byte) + return ((cs ^ 0xFFFF) + 1) & 0xFF + + class IcomCloneModeRadio(chirp_common.CloneModeRadio): """Base class for Icom clone-mode radios""" VENDOR = "Icom" @@ -612,6 +588,31 @@ """Returns the ranges this radio likes to have in a clone""" return cls._ranges
+ def process_frame_payload(self, payload): + """Convert BCD-encoded data to raw""" + bcddata = payload + data = "" + i = 0 + while i+1 < len(bcddata): + try: + val = int("%s%s" % (bcddata[i], bcddata[i+1]), 16) + i += 2 + data += struct.pack("B", val) + except ValueError, e: + LOG.error("Failed to parse byte: %s" % e) + break + + return data + + def get_payload(self, data, checksum): + """Returns the data with optional checksum BCD-encoded for the radio.""" + payload = "" + for byte in data: + payload += "%02X" % ord(byte) + if checksum: + payload += "%02X" % compute_checksum(data) + return payload + def sync_in(self): self._mmap = clone_from_radio(self) self.process_mmap() @@ -646,6 +647,67 @@ return honor_speed_switch_setting(self, settings)
+def flip_high_order_bit(data): + return [chr(ord(d) ^ 0x80) for d in list(data)] + + +def escape_raw_byte(byte): + """Escapes a raw byte for sending to the radio""" + # Certain bytes are used as control characters to the radio, so if one of + # these bytes is present in the stream to the radio, it gets escaped as + # 0xff followed by (byte & 0x0f) + if ord(byte) > 0xf9: + return "\xff%s" % (chr(ord(byte) & 0xf)) + return byte + + +def unescape_raw_bytes(escaped_data): + """Unescapes raw bytes from the radio.""" + data = "" + i = 0 + while i < len(escaped_data): + byte = escaped_data[i] + if byte == '\xff': + if i + 1 >= len(escaped_data): + raise errors.InvalidDataError( + "Unexpected escape character at end of data") + i += 1 + byte = chr(0xf0 | ord(escaped_data[i])) + data += byte + i += 1 + return data + + +class IcomRawCloneModeRadio(IcomCloneModeRadio): + """Subclass for Icom clone-mode radios using the raw data protocol.""" + + def process_frame_payload(self, payload): + """Payloads from a raw-clone-mode radio are already in raw format.""" + return unescape_raw_bytes(payload) + + def get_payload(self, data, checksum): + """Returns the data with optional checksum in raw format.""" + if checksum: + cs = chr(compute_checksum(data)) + else: + cs = "" + payload = "%s%s" % (data, cs) + # Escape control characters. + escaped_payload = [escape_raw_byte(b) for b in payload] + return "".join(escaped_payload) + + def sync_in(self): + # The radio returns all the bytes with the high-order bit flipped. + _mmap = clone_from_radio(self) + _mmap = flip_high_order_bit(_mmap.get_packed()) + self._mmap = memmap.MemoryMap(_mmap) + self.process_mmap() + + def get_mmap(self): + _data = flip_high_order_bit(self._mmap.get_packed()) + return memmap.MemoryMap(_data) + + class IcomLiveRadio(chirp_common.LiveRadio): """Base class for an Icom Live-mode radio""" VENDOR = "Icom"