-
Notifications
You must be signed in to change notification settings - Fork 19
Description
Firstly thanks for the work on this package. Its been ideal for my use case.
The only point it seemed to struggle with is when unicode characters were part of a text field. I trawled through the Jackcess source code to find that the buffer starting with b'\xff\xfe' means the field is set to use "unicode compression" which seems to pull out the b'\x00' padding from 0-255 characters in UTF-16-LE. Where it can't it leaves the original encoding, so the padding needs to be pushed back in where necessary before decoding using UTF-16-LE.
So tweaking the below function in utils
def parse_type(data_type, buffer, length=None, version=3, props=None):
parsed = ""
# Bool or int8
if data_type == TYPE_INT8:
parsed = struct.unpack_from("b", buffer)[0]
elif data_type == TYPE_INT16:
parsed = struct.unpack_from("h", buffer)[0]
elif data_type == TYPE_INT32 or data_type == TYPE_COMPLEX:
parsed = struct.unpack_from("i", buffer)[0]
elif data_type == TYPE_MONEY:
parsed = struct.unpack_from("q", buffer)[0]
if props and "Format" in props:
prop_format = props['Format']
if parsed == 0:
parsed = [y for x, y in FORMAT_TO_DEFAULT_VALUE.items() if prop_format.startswith(x)]
if not parsed:
logging.warning(f"parse_type got unknown format while parsing money field {prop_format}")
else:
parsed = parsed[0]
else:
parsed = parse_money_type(parsed, prop_format)
elif data_type == TYPE_FLOAT32:
parsed = struct.unpack_from("f", buffer)[0]
elif data_type == TYPE_FLOAT64:
parsed = struct.unpack_from("d", buffer)[0]
elif data_type == TYPE_DATETIME:
double_datetime = struct.unpack_from("q", buffer)[0]
parsed = mdb_date_to_readable(double_datetime)
elif data_type == TYPE_BINARY:
parsed = buffer[:length]
offset = length
elif data_type == TYPE_OLE:
parsed = buffer
elif data_type == TYPE_GUID:
parsed = buffer[:16]
guid = uuid.UUID(parsed.hex())
parsed = str(guid)
elif data_type == TYPE_96_bit_17_BYTES:
parsed = buffer[:17]
elif data_type == TYPE_TEXT:
# if version > 3:
# # Looks like if BOM is present text is already decoded
# if buffer.startswith(b"\xfe\xff") or buffer.startswith(b"\xff\xfe"):
# buff = buffer[2:]
# parsed = get_decoded_text(buff)
# else:
# parsed = buffer.decode("utf-16", errors='ignore')
# else:
# parsed = get_decoded_text(buffer)
###Use custom parsing functions derived from Jackcess approach (Likely still needs work to support all JET versions. but works well for .accdb 2007)
parsed = decodeTextValue(buffer)
else:
logging.debug(f"parse_type - unsupported data type: {data_type}")
return parsedand adding the following functions that shamelessly attempt to mimic the approach taken in Jackcess (Java)
###Text type decoding functions
def decodeTextValue(data: bytes):
'''Decodes a compressed or uncompressed text value'''
## see if data is compressed. the 0xFF, 0xFE sequence indicates that
## compression is used (sort of, see algorithm below)
TEXT_COMPRESSION_HEADER = b'\xff\xfe'
isCompressed = (len(data) > 1 and
data[0] == TEXT_COMPRESSION_HEADER[0] and
data[1] == TEXT_COMPRESSION_HEADER[1])
if isCompressed:
textBuf = ''
dataStart = len(TEXT_COMPRESSION_HEADER)
dataEnd = dataStart
inCompressedMode = True
while dataEnd < len(data):
endByte = data[dataEnd:dataEnd+1]
if endByte == b'\x00':
# handle current segment
textBuf = textBuf + decodeTextSegment(data, dataStart, dataEnd, inCompressedMode)
inCompressedMode = not inCompressedMode
dataEnd += 1
dataStart = dataEnd
else:
dataEnd += 1
# handle last segment
textBuf = textBuf + decodeTextSegment(data, dataStart, dataEnd, inCompressedMode)
return textBuf
return decodeUncompressedText(data,0,len(data)) #should pass charset from jet version
def decodeTextSegment(data: bytes, dataStart: int, dataEnd: int, inCompressedMode: bool):
'''
Decodes a segment of a text value into the given buffer according to the
given status of the segment (compressed/uncompressed).
'''
if dataEnd <= dataStart:
#no data
return
dataLength = dataEnd - dataStart
if inCompressedMode:
tmpData = bytearray(dataLength * 2)
tmpIdx = 0
for i in range(dataStart,dataEnd):
tmpData[tmpIdx:tmpIdx+1] = data[i:i+1]
tmpIdx += 2
data = bytes(tmpData)
dataStart = 0
dataEnd = len(data)
return decodeUncompressedText(data,dataStart,dataEnd)
def decodeUncompressedText(textBytes: bytes,dataStart: int, dataEnd: int): #should include charset variable
bytesToDecode = textBytes[dataStart:dataEnd]
try:
decoded = bytesToDecode.decode('utf_16_le') ##appears to be the default for the jet versions i've tested. may need to be variable based on db charset
except UnicodeDecodeError:
try:
decoded = bytesToDecode.decode('latin1')
except UnicodeDecodeError:
decoded = bytesToDecode.decode('utf-8', errors='ignore')
return decodedSeems to successfully parse whatever unicode characters I add to my sample .accdb files.
I've no doubt they could be written more succinctly but just tried to replicate what I found here: https://github.com/jahlborn/jackcess/blob/7b338ea7bfd9fb9677c7c1634d760f41b4a611ab/src/main/java/com/healthmarketscience/jackcess/impl/ColumnImpl.java#L1584C21-L1584C22
Hope it helps
Thanks again!