avatar.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*-
  3. from PIL import Image
  4. import io
  5. import glob
  6. import os
  7. import numpy as np
  8. import logging
  9. import sqlite3
  10. logger = logging.getLogger(__name__)
  11. from .common.textutil import md5
  12. def _filename_priority(s):
  13. if "_hd" in s and s.endswith(".png"):
  14. return 10
  15. else:
  16. return 1
  17. class AvatarReader(object):
  18. def __init__(self, res_dir, avt_db="avatar.index"):
  19. self.sfs_dir = os.path.join(res_dir, 'sfs')
  20. # new location of avatar, see #50
  21. self.avt_dir = os.path.join(res_dir, 'avatar')
  22. if not os.path.isdir(self.avt_dir) or len(os.listdir(self.avt_dir)) == 0:
  23. self.avt_dir = None
  24. self.avt_db = avt_db
  25. self._use_avt = True
  26. if self.avt_db is not None:
  27. if len(glob.glob(os.path.join(self.sfs_dir, 'avatar*'))) == 0:
  28. # has sfs/avatar*
  29. self.avt_db = None
  30. if self.avt_dir is None and self.avt_db is None:
  31. logger.warn("Cannot find avatar storage. Will not use avatar!")
  32. self._use_avt = False
  33. def get_avatar_from_avtdb(self, avtid):
  34. try:
  35. candidates = self._search_avt_db(avtid)
  36. candidates = sorted(candidates, key=lambda x: _filename_priority(x[0]), reverse=True)
  37. for c in candidates:
  38. path, offset, size = c
  39. return self.read_img_from_block(path, offset, size)
  40. except Exception:
  41. pass
  42. def get_avatar_from_avtdir(self, avtid) -> Image.Image | None:
  43. dir1, dir2 = avtid[:2], avtid[2:4]
  44. candidates = glob.glob(os.path.join(self.avt_dir, dir1, dir2, f"*{avtid}*"))
  45. candidates = sorted(set(candidates), key=_filename_priority, reverse=True)
  46. for cand in candidates:
  47. if os.path.isdir(cand):
  48. candidates.extend(os.path.join(cand, x) for x in os.listdir(cand))
  49. for cand in candidates:
  50. if os.path.isdir(cand):
  51. continue
  52. try:
  53. if cand.endswith(".bm"):
  54. return self.read_bm_file(cand)
  55. else:
  56. return Image.open(cand)
  57. except Exception:
  58. logger.exception("")
  59. pass
  60. def get_avatar(self, username):
  61. """ username: `username` field in db.rcontact"""
  62. if not self._use_avt:
  63. return None
  64. avtid = md5(username.encode('utf-8'))
  65. if self.avt_db is not None:
  66. ret = self.get_avatar_from_avtdb(avtid)
  67. if ret is not None:
  68. return ret
  69. if self.avt_dir is not None:
  70. ret = self.get_avatar_from_avtdir(avtid)
  71. if ret is not None:
  72. return ret
  73. logger.warning("Avatar file for {} not found.".format(username))
  74. def save_avatar_to_avtdir(self, username: str, im: Image.Image):
  75. """Save a downloaded avatar to avtdir so it can be reused next time."""
  76. avtid = md5(username.encode('utf-8'))
  77. dir1, dir2 = avtid[:2], avtid[2:4]
  78. fname = os.path.join(self.avt_dir, dir1, dir2, f"user_{avtid}.png")
  79. os.makedirs(os.path.dirname(fname), exist_ok=True)
  80. logger.info(f"Caching downloaded avatar for {username} to {fname}.")
  81. im.save(fname, 'PNG')
  82. def read_img_from_block(self, filename, pos, size):
  83. file_idx = pos >> 32
  84. fname = os.path.join(self.sfs_dir,
  85. 'avatar.block.' + '{:05d}'.format(file_idx))
  86. # offset of each block file: 17 + len(path)
  87. start_pos = pos - file_idx * (2**32) + 16 + len(filename) + 1
  88. try:
  89. with open(fname, 'rb') as f:
  90. f.seek(start_pos)
  91. data = f.read(size)
  92. im = Image.open(io.BytesIO(data))
  93. return im
  94. except IOError as e:
  95. logger.warn("Cannot read avatar from {}: {}".format(fname, str(e)))
  96. return None
  97. def read_bm_file(self, fname):
  98. # history at https://github.com/ppwwyyxx/wechat-dump/pull/14
  99. with open(fname, 'rb') as f:
  100. # filesize is 36880=96x96x4+16
  101. size = (96, 96, 3)
  102. img = np.zeros(size, dtype='uint8')
  103. for i in range(96):
  104. for j in range(96):
  105. r, g, b, a = f.read(4)
  106. img[i,j] = (r, g, b)
  107. return Image.fromarray(img, mode="RGB")
  108. def _search_avt_db(self, avtid):
  109. conn = sqlite3.connect(self.avt_db)
  110. cursor = conn.execute("select FileName,Offset,Size from Index_avatar")
  111. candidates = []
  112. for path, offset, size in cursor:
  113. if avtid in path:
  114. candidates.append((path, offset, size))
  115. return candidates
  116. if __name__ == '__main__':
  117. import sys
  118. r = AvatarReader(sys.argv[1], sys.argv[2])
  119. print(r.get_avatar(sys.argv[3]))