File: thumbspage/piexif/_webp.py
import struct def split(data): if data[0:4] != b"RIFF" or data[8:12] != b"WEBP": raise ValueError("Not WebP") webp_length_bytes = data[4:8] webp_length = struct.unpack("<L", webp_length_bytes)[0] RIFF_HEADER_SIZE = 8 file_size = RIFF_HEADER_SIZE + webp_length start = 12 pointer = start CHUNK_FOURCC_LENGTH = 4 LENGTH_BYTES_LENGTH = 4 chunks = [] while pointer + CHUNK_FOURCC_LENGTH + LENGTH_BYTES_LENGTH < file_size: fourcc = data[pointer:pointer + CHUNK_FOURCC_LENGTH] pointer += CHUNK_FOURCC_LENGTH chunk_length_bytes = data[pointer:pointer + LENGTH_BYTES_LENGTH] chunk_length = struct.unpack("<L", chunk_length_bytes)[0] pointer += LENGTH_BYTES_LENGTH chunk_data = data[pointer:pointer + chunk_length] chunks.append({"fourcc":fourcc, "length_bytes":chunk_length_bytes, "data":chunk_data}) padding = 1 if chunk_length % 2 else 0 pointer += chunk_length + padding return chunks def merge_chunks(chunks): merged = b"".join([chunk["fourcc"] + chunk["length_bytes"] + chunk["data"] + (len(chunk["data"]) % 2) * b"\x00" for chunk in chunks]) return merged def _get_size_from_vp8x(chunk): width_minus_one_bytes = chunk["data"][-6:-3] + b"\x00" width_minus_one = struct.unpack("<L", width_minus_one_bytes)[0] width = width_minus_one + 1 height_minus_one_bytes = chunk["data"][-3:] + b"\x00" height_minus_one = struct.unpack("<L", height_minus_one_bytes)[0] height = height_minus_one + 1 return (width, height) def _get_size_from_vp8(chunk): BEGIN_CODE = b"\x9d\x01\x2a" begin_index = chunk["data"].find(BEGIN_CODE) if begin_index == -1: ValueError("wrong VP8") else: BEGIN_CODE_LENGTH = len(BEGIN_CODE) LENGTH_BYTES_LENGTH = 2 length_start = begin_index + BEGIN_CODE_LENGTH width_bytes = chunk["data"][length_start:length_start + LENGTH_BYTES_LENGTH] width = struct.unpack("<H", width_bytes)[0] height_bytes = chunk["data"][length_start + LENGTH_BYTES_LENGTH:length_start + 2 * LENGTH_BYTES_LENGTH] height = struct.unpack("<H", height_bytes)[0] return (width, height) def _vp8L_contains_alpha(chunk_data): flag = ord(chunk_data[4:5]) >> 5-1 & ord(b"\x01") contains = 1 * flag return contains def _get_size_from_vp8L(chunk): b1 = chunk["data"][1:2] b2 = chunk["data"][2:3] b3 = chunk["data"][3:4] b4 = chunk["data"][4:5] width_minus_one = (ord(b2) & ord(b"\x3F")) << 8 | ord(b1) width = width_minus_one + 1 height_minus_one = (ord(b4) & ord(b"\x0F")) << 10 | ord(b3) << 2 | (ord(b2) & ord(b"\xC0")) >> 6 height = height_minus_one + 1 return (width, height) def _get_size_from_anmf(chunk): width_minus_one_bytes = chunk["data"][6:9] + b"\x00" width_minus_one = struct.unpack("<L", width_minus_one_bytes)[0] width = width_minus_one + 1 height_minus_one_bytes = chunk["data"][9:12] + b"\x00" height_minus_one = struct.unpack("<L", height_minus_one_bytes)[0] height = height_minus_one + 1 return (width, height) def set_vp8x(chunks): width = None height = None flags = [b"0", b"0", b"0", b"0", b"0", b"0", b"0", b"0"] # [0, 0, ICC, Alpha, EXIF, XMP, Anim, 0] for chunk in chunks: if chunk["fourcc"] == b"VP8X": width, height = _get_size_from_vp8x(chunk) elif chunk["fourcc"] == b"VP8 ": width, height = _get_size_from_vp8(chunk) elif chunk["fourcc"] == b"VP8L": is_rgba = _vp8L_contains_alpha(chunk["data"]) if is_rgba: flags[3] = b"1" width, height = _get_size_from_vp8L(chunk) elif chunk["fourcc"] == b"ANMF": width, height = _get_size_from_anmf(chunk) elif chunk["fourcc"] == b"ICCP": flags[2] = b"1" elif chunk["fourcc"] == b"ALPH": flags[3] = b"1" elif chunk["fourcc"] == b"EXIF": flags[4] = b"1" elif chunk["fourcc"] == b"XMP ": flags[5] = b"1" elif chunk["fourcc"] == b"ANIM": flags[6] = b"1" width_minus_one = width - 1 height_minus_one = height - 1 if chunks[0]["fourcc"] == b"VP8X": chunks.pop(0) header_bytes = b"VP8X" length_bytes = b"\x0a\x00\x00\x00" flags_bytes = struct.pack("B", int(b"".join(flags), 2)) padding_bytes = b"\x00\x00\x00" width_bytes = struct.pack("<L", width_minus_one)[:3] height_bytes = struct.pack("<L", height_minus_one)[:3] data_bytes = flags_bytes + padding_bytes + width_bytes + height_bytes vp8x_chunk = {"fourcc":header_bytes, "length_bytes":length_bytes, "data":data_bytes} chunks.insert(0, vp8x_chunk) return chunks def get_file_header(chunks): WEBP_HEADER_LENGTH = 4 FOURCC_LENGTH = 4 LENGTH_BYTES_LENGTH = 4 length = WEBP_HEADER_LENGTH for chunk in chunks: data_length = struct.unpack("<L", chunk["length_bytes"])[0] data_length += 1 if data_length % 2 else 0 length += FOURCC_LENGTH + LENGTH_BYTES_LENGTH + data_length length_bytes = struct.pack("<L", length) riff = b"RIFF" webp_header = b"WEBP" file_header = riff + length_bytes + webp_header return file_header def get_exif(data): if data[0:4] != b"RIFF" or data[8:12] != b"WEBP": raise ValueError("Not WebP") if data[12:16] != b"VP8X": raise ValueError("doesnot have exif") webp_length_bytes = data[4:8] webp_length = struct.unpack("<L", webp_length_bytes)[0] RIFF_HEADER_SIZE = 8 file_size = RIFF_HEADER_SIZE + webp_length start = 12 pointer = start CHUNK_FOURCC_LENGTH = 4 LENGTH_BYTES_LENGTH = 4 chunks = [] exif = b"" while pointer < file_size: fourcc = data[pointer:pointer + CHUNK_FOURCC_LENGTH] pointer += CHUNK_FOURCC_LENGTH chunk_length_bytes = data[pointer:pointer + LENGTH_BYTES_LENGTH] chunk_length = struct.unpack("<L", chunk_length_bytes)[0] if chunk_length % 2: chunk_length += 1 pointer += LENGTH_BYTES_LENGTH if fourcc == b"EXIF": return data[pointer:pointer + chunk_length] pointer += chunk_length return None # if there isn't exif, return None. def insert_exif_into_chunks(chunks, exif_bytes): EXIF_HEADER = b"EXIF" exif_length_bytes = struct.pack("<L", len(exif_bytes)) exif_chunk = {"fourcc":EXIF_HEADER, "length_bytes":exif_length_bytes, "data":exif_bytes} xmp_index = None animation_index = None for index, chunk in enumerate(chunks): if chunk["fourcc"] == b"EXIF": chunks.pop(index) for index, chunk in enumerate(chunks): if chunk["fourcc"] == b"XMP ": xmp_index = index elif chunk["fourcc"] == b"ANIM": animation_index = index if xmp_index is not None: chunks.insert(xmp_index, exif_chunk) elif animation_index is not None: chunks.insert(animation_index, exif_chunk) else: chunks.append(exif_chunk) return chunks def insert(webp_bytes, exif_bytes): chunks = split(webp_bytes) chunks = insert_exif_into_chunks(chunks, exif_bytes) chunks = set_vp8x(chunks) file_header = get_file_header(chunks) merged = merge_chunks(chunks) new_webp_bytes = file_header + merged return new_webp_bytes def remove(webp_bytes): chunks = split(webp_bytes) for index, chunk in enumerate(chunks): if chunk["fourcc"] == b"EXIF": chunks.pop(index) chunks = set_vp8x(chunks) file_header = get_file_header(chunks) merged = merge_chunks(chunks) new_webp_bytes = file_header + merged return new_webp_bytes