vr.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. import os,sys
  2. parent_directory = os.path.dirname(os.path.abspath(__file__))
  3. import logging,pdb
  4. logger = logging.getLogger(__name__)
  5. import librosa
  6. import numpy as np
  7. import soundfile as sf
  8. import torch
  9. from lib.lib_v5 import nets_61968KB as Nets
  10. from lib.lib_v5 import spec_utils
  11. from lib.lib_v5.model_param_init import ModelParameters
  12. from lib.lib_v5.nets_new import CascadedNet
  13. from lib.utils import inference
  14. class AudioPre:
  15. def __init__(self, agg, model_path, device, is_half, tta=False):
  16. self.model_path = model_path
  17. self.device = device
  18. self.data = {
  19. # Processing Options
  20. "postprocess": False,
  21. "tta": tta,
  22. # Constants
  23. "window_size": 512,
  24. "agg": agg,
  25. "high_end_process": "mirroring",
  26. }
  27. mp = ModelParameters("%s/lib/lib_v5/modelparams/4band_v2.json"%parent_directory)
  28. model = Nets.CascadedASPPNet(mp.param["bins"] * 2)
  29. cpk = torch.load(model_path, map_location="cpu")
  30. model.load_state_dict(cpk)
  31. model.eval()
  32. if is_half:
  33. model = model.half().to(device)
  34. else:
  35. model = model.to(device)
  36. self.mp = mp
  37. self.model = model
  38. def _path_audio_(
  39. self, music_file, ins_root=None, vocal_root=None, format="flac", is_hp3=False
  40. ):
  41. if ins_root is None and vocal_root is None:
  42. return "No save root."
  43. name = os.path.basename(music_file)
  44. if ins_root is not None:
  45. os.makedirs(ins_root, exist_ok=True)
  46. if vocal_root is not None:
  47. os.makedirs(vocal_root, exist_ok=True)
  48. X_wave, y_wave, X_spec_s, y_spec_s = {}, {}, {}, {}
  49. bands_n = len(self.mp.param["band"])
  50. # print(bands_n)
  51. for d in range(bands_n, 0, -1):
  52. bp = self.mp.param["band"][d]
  53. if d == bands_n: # high-end band
  54. (
  55. X_wave[d],
  56. _,
  57. ) = librosa.core.load( # 理论上librosa读取可能对某些音频有bug,应该上ffmpeg读取,但是太麻烦了弃坑
  58. music_file,
  59. sr = bp["sr"],
  60. mono = False,
  61. dtype = np.float32,
  62. res_type = bp["res_type"],
  63. )
  64. if X_wave[d].ndim == 1:
  65. X_wave[d] = np.asfortranarray([X_wave[d], X_wave[d]])
  66. else: # lower bands
  67. X_wave[d] = librosa.core.resample(
  68. X_wave[d + 1],
  69. orig_sr = self.mp.param["band"][d + 1]["sr"],
  70. target_sr = bp["sr"],
  71. res_type = bp["res_type"],
  72. )
  73. # Stft of wave source
  74. X_spec_s[d] = spec_utils.wave_to_spectrogram_mt(
  75. X_wave[d],
  76. bp["hl"],
  77. bp["n_fft"],
  78. self.mp.param["mid_side"],
  79. self.mp.param["mid_side_b2"],
  80. self.mp.param["reverse"],
  81. )
  82. # pdb.set_trace()
  83. if d == bands_n and self.data["high_end_process"] != "none":
  84. input_high_end_h = (bp["n_fft"] // 2 - bp["crop_stop"]) + (
  85. self.mp.param["pre_filter_stop"] - self.mp.param["pre_filter_start"]
  86. )
  87. input_high_end = X_spec_s[d][
  88. :, bp["n_fft"] // 2 - input_high_end_h : bp["n_fft"] // 2, :
  89. ]
  90. X_spec_m = spec_utils.combine_spectrograms(X_spec_s, self.mp)
  91. aggresive_set = float(self.data["agg"] / 100)
  92. aggressiveness = {
  93. "value": aggresive_set,
  94. "split_bin": self.mp.param["band"][1]["crop_stop"],
  95. }
  96. with torch.no_grad():
  97. pred, X_mag, X_phase = inference(
  98. X_spec_m, self.device, self.model, aggressiveness, self.data
  99. )
  100. # Postprocess
  101. if self.data["postprocess"]:
  102. pred_inv = np.clip(X_mag - pred, 0, np.inf)
  103. pred = spec_utils.mask_silence(pred, pred_inv)
  104. y_spec_m = pred * X_phase
  105. v_spec_m = X_spec_m - y_spec_m
  106. if is_hp3 == True:
  107. ins_root,vocal_root = vocal_root,ins_root
  108. if ins_root is not None:
  109. if self.data["high_end_process"].startswith("mirroring"):
  110. input_high_end_ = spec_utils.mirroring(
  111. self.data["high_end_process"], y_spec_m, input_high_end, self.mp
  112. )
  113. wav_instrument = spec_utils.cmb_spectrogram_to_wave(
  114. y_spec_m, self.mp, input_high_end_h, input_high_end_
  115. )
  116. else:
  117. wav_instrument = spec_utils.cmb_spectrogram_to_wave(y_spec_m, self.mp)
  118. logger.info("%s instruments done" % name)
  119. if is_hp3 == True:
  120. head = "vocal_"
  121. else:
  122. head = "instrument_"
  123. if format in ["wav", "flac"]:
  124. sf.write(
  125. os.path.join(
  126. ins_root,
  127. head + "{}_{}.{}".format(name, self.data["agg"], format),
  128. ),
  129. (np.array(wav_instrument) * 32768).astype("int16"),
  130. self.mp.param["sr"],
  131. ) #
  132. else:
  133. path = os.path.join(
  134. ins_root, head + "{}_{}.wav".format(name, self.data["agg"])
  135. )
  136. sf.write(
  137. path,
  138. (np.array(wav_instrument) * 32768).astype("int16"),
  139. self.mp.param["sr"],
  140. )
  141. if os.path.exists(path):
  142. opt_format_path = path[:-4] + ".%s" % format
  143. os.system("ffmpeg -i %s -vn %s -q:a 2 -y" % (path, opt_format_path))
  144. if os.path.exists(opt_format_path):
  145. try:
  146. os.remove(path)
  147. except:
  148. pass
  149. if vocal_root is not None:
  150. if is_hp3 == True:
  151. head = "instrument_"
  152. else:
  153. head = "vocal_"
  154. if self.data["high_end_process"].startswith("mirroring"):
  155. input_high_end_ = spec_utils.mirroring(
  156. self.data["high_end_process"], v_spec_m, input_high_end, self.mp
  157. )
  158. wav_vocals = spec_utils.cmb_spectrogram_to_wave(
  159. v_spec_m, self.mp, input_high_end_h, input_high_end_
  160. )
  161. else:
  162. wav_vocals = spec_utils.cmb_spectrogram_to_wave(v_spec_m, self.mp)
  163. logger.info("%s vocals done" % name)
  164. if format in ["wav", "flac"]:
  165. sf.write(
  166. os.path.join(
  167. vocal_root,
  168. head + "{}_{}.{}".format(name, self.data["agg"], format),
  169. ),
  170. (np.array(wav_vocals) * 32768).astype("int16"),
  171. self.mp.param["sr"],
  172. )
  173. else:
  174. path = os.path.join(
  175. vocal_root, head + "{}_{}.wav".format(name, self.data["agg"])
  176. )
  177. sf.write(
  178. path,
  179. (np.array(wav_vocals) * 32768).astype("int16"),
  180. self.mp.param["sr"],
  181. )
  182. if os.path.exists(path):
  183. opt_format_path = path[:-4] + ".%s" % format
  184. os.system("ffmpeg -i %s -vn %s -q:a 2 -y" % (path, opt_format_path))
  185. if os.path.exists(opt_format_path):
  186. try:
  187. os.remove(path)
  188. except:
  189. pass
  190. class AudioPreDeEcho:
  191. def __init__(self, agg, model_path, device, is_half, tta=False):
  192. self.model_path = model_path
  193. self.device = device
  194. self.data = {
  195. # Processing Options
  196. "postprocess": False,
  197. "tta": tta,
  198. # Constants
  199. "window_size": 512,
  200. "agg": agg,
  201. "high_end_process": "mirroring",
  202. }
  203. mp = ModelParameters("%s/lib/lib_v5/modelparams/4band_v3.json"%parent_directory)
  204. nout = 64 if "DeReverb" in model_path else 48
  205. model = CascadedNet(mp.param["bins"] * 2, nout)
  206. cpk = torch.load(model_path, map_location="cpu")
  207. model.load_state_dict(cpk)
  208. model.eval()
  209. if is_half:
  210. model = model.half().to(device)
  211. else:
  212. model = model.to(device)
  213. self.mp = mp
  214. self.model = model
  215. def _path_audio_(
  216. self, music_file, vocal_root=None, ins_root=None, format="flac", is_hp3=False
  217. ): # 3个VR模型vocal和ins是反的
  218. if ins_root is None and vocal_root is None:
  219. return "No save root."
  220. name = os.path.basename(music_file)
  221. if ins_root is not None:
  222. os.makedirs(ins_root, exist_ok=True)
  223. if vocal_root is not None:
  224. os.makedirs(vocal_root, exist_ok=True)
  225. X_wave, y_wave, X_spec_s, y_spec_s = {}, {}, {}, {}
  226. bands_n = len(self.mp.param["band"])
  227. # print(bands_n)
  228. for d in range(bands_n, 0, -1):
  229. bp = self.mp.param["band"][d]
  230. if d == bands_n: # high-end band
  231. (
  232. X_wave[d],
  233. _,
  234. ) = librosa.core.load( # 理论上librosa读取可能对某些音频有bug,应该上ffmpeg读取,但是太麻烦了弃坑
  235. music_file,
  236. sr = bp["sr"],
  237. mono = False,
  238. dtype = np.float32,
  239. res_type = bp["res_type"],
  240. )
  241. if X_wave[d].ndim == 1:
  242. X_wave[d] = np.asfortranarray([X_wave[d], X_wave[d]])
  243. else: # lower bands
  244. X_wave[d] = librosa.core.resample(
  245. X_wave[d + 1],
  246. orig_sr = self.mp.param["band"][d + 1]["sr"],
  247. target_sr = bp["sr"],
  248. res_type = bp["res_type"],
  249. )
  250. # Stft of wave source
  251. X_spec_s[d] = spec_utils.wave_to_spectrogram_mt(
  252. X_wave[d],
  253. bp["hl"],
  254. bp["n_fft"],
  255. self.mp.param["mid_side"],
  256. self.mp.param["mid_side_b2"],
  257. self.mp.param["reverse"],
  258. )
  259. # pdb.set_trace()
  260. if d == bands_n and self.data["high_end_process"] != "none":
  261. input_high_end_h = (bp["n_fft"] // 2 - bp["crop_stop"]) + (
  262. self.mp.param["pre_filter_stop"] - self.mp.param["pre_filter_start"]
  263. )
  264. input_high_end = X_spec_s[d][
  265. :, bp["n_fft"] // 2 - input_high_end_h : bp["n_fft"] // 2, :
  266. ]
  267. X_spec_m = spec_utils.combine_spectrograms(X_spec_s, self.mp)
  268. aggresive_set = float(self.data["agg"] / 100)
  269. aggressiveness = {
  270. "value": aggresive_set,
  271. "split_bin": self.mp.param["band"][1]["crop_stop"],
  272. }
  273. with torch.no_grad():
  274. pred, X_mag, X_phase = inference(
  275. X_spec_m, self.device, self.model, aggressiveness, self.data
  276. )
  277. # Postprocess
  278. if self.data["postprocess"]:
  279. pred_inv = np.clip(X_mag - pred, 0, np.inf)
  280. pred = spec_utils.mask_silence(pred, pred_inv)
  281. y_spec_m = pred * X_phase
  282. v_spec_m = X_spec_m - y_spec_m
  283. if ins_root is not None:
  284. if self.data["high_end_process"].startswith("mirroring"):
  285. input_high_end_ = spec_utils.mirroring(
  286. self.data["high_end_process"], y_spec_m, input_high_end, self.mp
  287. )
  288. wav_instrument = spec_utils.cmb_spectrogram_to_wave(
  289. y_spec_m, self.mp, input_high_end_h, input_high_end_
  290. )
  291. else:
  292. wav_instrument = spec_utils.cmb_spectrogram_to_wave(y_spec_m, self.mp)
  293. logger.info("%s instruments done" % name)
  294. if format in ["wav", "flac"]:
  295. sf.write(
  296. os.path.join(
  297. ins_root,
  298. "vocal_{}_{}.{}".format(name, self.data["agg"], format),
  299. ),
  300. (np.array(wav_instrument) * 32768).astype("int16"),
  301. self.mp.param["sr"],
  302. ) #
  303. else:
  304. path = os.path.join(
  305. ins_root, "vocal_{}_{}.wav".format(name, self.data["agg"])
  306. )
  307. sf.write(
  308. path,
  309. (np.array(wav_instrument) * 32768).astype("int16"),
  310. self.mp.param["sr"],
  311. )
  312. if os.path.exists(path):
  313. opt_format_path = path[:-4] + ".%s" % format
  314. os.system("ffmpeg -i %s -vn %s -q:a 2 -y" % (path, opt_format_path))
  315. if os.path.exists(opt_format_path):
  316. try:
  317. os.remove(path)
  318. except:
  319. pass
  320. if vocal_root is not None:
  321. if self.data["high_end_process"].startswith("mirroring"):
  322. input_high_end_ = spec_utils.mirroring(
  323. self.data["high_end_process"], v_spec_m, input_high_end, self.mp
  324. )
  325. wav_vocals = spec_utils.cmb_spectrogram_to_wave(
  326. v_spec_m, self.mp, input_high_end_h, input_high_end_
  327. )
  328. else:
  329. wav_vocals = spec_utils.cmb_spectrogram_to_wave(v_spec_m, self.mp)
  330. logger.info("%s vocals done" % name)
  331. if format in ["wav", "flac"]:
  332. sf.write(
  333. os.path.join(
  334. vocal_root,
  335. "instrument_{}_{}.{}".format(name, self.data["agg"], format),
  336. ),
  337. (np.array(wav_vocals) * 32768).astype("int16"),
  338. self.mp.param["sr"],
  339. )
  340. else:
  341. path = os.path.join(
  342. vocal_root, "instrument_{}_{}.wav".format(name, self.data["agg"])
  343. )
  344. sf.write(
  345. path,
  346. (np.array(wav_vocals) * 32768).astype("int16"),
  347. self.mp.param["sr"],
  348. )
  349. if os.path.exists(path):
  350. opt_format_path = path[:-4] + ".%s" % format
  351. os.system("ffmpeg -i %s -vn %s -q:a 2 -y" % (path, opt_format_path))
  352. if os.path.exists(opt_format_path):
  353. try:
  354. os.remove(path)
  355. except:
  356. pass