2
0

main.py 21 KB


  1. import argparse, os, sys, datetime, glob, importlib
  2. from omegaconf import OmegaConf
  3. import numpy as np
  4. from PIL import Image
  5. import torch
  6. import torchvision
  7. from torch.utils.data import random_split, DataLoader, Dataset
  8. import pytorch_lightning as pl
  9. from pytorch_lightning import seed_everything
  10. from pytorch_lightning.trainer import Trainer
  11. from pytorch_lightning.callbacks import ModelCheckpoint, Callback, LearningRateMonitor
  12. from pytorch_lightning.utilities import rank_zero_only
  13. from taming.data.utils import custom_collate
  14. def get_obj_from_str(string, reload=False):
  15. module, cls = string.rsplit(".", 1)
  16. if reload:
  17. module_imp = importlib.import_module(module)
  18. importlib.reload(module_imp)
  19. return getattr(importlib.import_module(module, package=None), cls)
  20. def get_parser(**parser_kwargs):
  21. def str2bool(v):
  22. if isinstance(v, bool):
  23. return v
  24. if v.lower() in ("yes", "true", "t", "y", "1"):
  25. return True
  26. elif v.lower() in ("no", "false", "f", "n", "0"):
  27. return False
  28. else:
  29. raise argparse.ArgumentTypeError("Boolean value expected.")
  30. parser = argparse.ArgumentParser(**parser_kwargs)
  31. parser.add_argument(
  32. "-n",
  33. "--name",
  34. type=str,
  35. const=True,
  36. default="",
  37. nargs="?",
  38. help="postfix for logdir",
  39. )
  40. parser.add_argument(
  41. "-r",
  42. "--resume",
  43. type=str,
  44. const=True,
  45. default="",
  46. nargs="?",
  47. help="resume from logdir or checkpoint in logdir",
  48. )
  49. parser.add_argument(
  50. "-b",
  51. "--base",
  52. nargs="*",
  53. metavar="base_config.yaml",
  54. help="paths to base configs. Loaded from left-to-right. "
  55. "Parameters can be overwritten or added with command-line options of the form `--key value`.",
  56. default=list(),
  57. )
  58. parser.add_argument(
  59. "-t",
  60. "--train",
  61. type=str2bool,
  62. const=True,
  63. default=False,
  64. nargs="?",
  65. help="train",
  66. )
  67. parser.add_argument(
  68. "--no-test",
  69. type=str2bool,
  70. const=True,
  71. default=False,
  72. nargs="?",
  73. help="disable test",
  74. )
  75. parser.add_argument("-p", "--project", help="name of new or path to existing project")
  76. parser.add_argument(
  77. "-d",
  78. "--debug",
  79. type=str2bool,
  80. nargs="?",
  81. const=True,
  82. default=False,
  83. help="enable post-mortem debugging",
  84. )
  85. parser.add_argument(
  86. "-s",
  87. "--seed",
  88. type=int,
  89. default=23,
  90. help="seed for seed_everything",
  91. )
  92. parser.add_argument(
  93. "-f",
  94. "--postfix",
  95. type=str,
  96. default="",
  97. help="post-postfix for default name",
  98. )
  99. return parser
  100. def nondefault_trainer_args(opt):
  101. parser = argparse.ArgumentParser()
  102. parser = Trainer.add_argparse_args(parser)
  103. args = parser.parse_args([])
  104. return sorted(k for k in vars(args) if getattr(opt, k) != getattr(args, k))
  105. def instantiate_from_config(config):
  106. if not "target" in config:
  107. raise KeyError("Expected key `target` to instantiate.")
  108. return get_obj_from_str(config["target"])(**config.get("params", dict()))
  109. class WrappedDataset(Dataset):
  110. """Wraps an arbitrary object with __len__ and __getitem__ into a pytorch dataset"""
  111. def __init__(self, dataset):
  112. self.data = dataset
  113. def __len__(self):
  114. return len(self.data)
  115. def __getitem__(self, idx):
  116. return self.data[idx]
  117. class DataModuleFromConfig(pl.LightningDataModule):
  118. def __init__(self, batch_size, train=None, validation=None, test=None,
  119. wrap=False, num_workers=None):
  120. super().__init__()
  121. self.batch_size = batch_size
  122. self.dataset_configs = dict()
  123. self.num_workers = num_workers if num_workers is not None else batch_size*2
  124. if train is not None:
  125. self.dataset_configs["train"] = train
  126. self.train_dataloader = self._train_dataloader
  127. if validation is not None:
  128. self.dataset_configs["validation"] = validation
  129. self.val_dataloader = self._val_dataloader
  130. if test is not None:
  131. self.dataset_configs["test"] = test
  132. self.test_dataloader = self._test_dataloader
  133. self.wrap = wrap
  134. def prepare_data(self):
  135. for data_cfg in self.dataset_configs.values():
  136. instantiate_from_config(data_cfg)
  137. def setup(self, stage=None):
  138. self.datasets = dict(
  139. (k, instantiate_from_config(self.dataset_configs[k]))
  140. for k in self.dataset_configs)
  141. if self.wrap:
  142. for k in self.datasets:
  143. self.datasets[k] = WrappedDataset(self.datasets[k])
  144. def _train_dataloader(self):
  145. return DataLoader(self.datasets["train"], batch_size=self.batch_size,
  146. num_workers=self.num_workers, shuffle=True, collate_fn=custom_collate)
  147. def _val_dataloader(self):
  148. return DataLoader(self.datasets["validation"],
  149. batch_size=self.batch_size,
  150. num_workers=self.num_workers, collate_fn=custom_collate)
  151. def _test_dataloader(self):
  152. return DataLoader(self.datasets["test"], batch_size=self.batch_size,
  153. num_workers=self.num_workers, collate_fn=custom_collate)
  154. class SetupCallback(Callback):
  155. def __init__(self, resume, now, logdir, ckptdir, cfgdir, config, lightning_config):
  156. super().__init__()
  157. self.resume = resume
  158. self.now = now
  159. self.logdir = logdir
  160. self.ckptdir = ckptdir
  161. self.cfgdir = cfgdir
  162. self.config = config
  163. self.lightning_config = lightning_config
  164. def on_pretrain_routine_start(self, trainer, pl_module):
  165. if trainer.global_rank == 0:
  166. # Create logdirs and save configs
  167. os.makedirs(self.logdir, exist_ok=True)
  168. os.makedirs(self.ckptdir, exist_ok=True)
  169. os.makedirs(self.cfgdir, exist_ok=True)
  170. print("Project config")
  171. print(self.config.pretty())
  172. OmegaConf.save(self.config,
  173. os.path.join(self.cfgdir, "{}-project.yaml".format(self.now)))
  174. print("Lightning config")
  175. print(self.lightning_config.pretty())
  176. OmegaConf.save(OmegaConf.create({"lightning": self.lightning_config}),
  177. os.path.join(self.cfgdir, "{}-lightning.yaml".format(self.now)))
  178. else:
  179. # ModelCheckpoint callback created log directory --- remove it
  180. if not self.resume and os.path.exists(self.logdir):
  181. dst, name = os.path.split(self.logdir)
  182. dst = os.path.join(dst, "child_runs", name)
  183. os.makedirs(os.path.split(dst)[0], exist_ok=True)
  184. try:
  185. os.rename(self.logdir, dst)
  186. except FileNotFoundError:
  187. pass
  188. class ImageLogger(Callback):
  189. def __init__(self, batch_frequency, max_images, clamp=True, increase_log_steps=True):
  190. super().__init__()
  191. self.batch_freq = batch_frequency
  192. self.max_images = max_images
  193. self.logger_log_images = {
  194. pl.loggers.WandbLogger: self._wandb,
  195. pl.loggers.TestTubeLogger: self._testtube,
  196. }
  197. self.log_steps = [2 ** n for n in range(int(np.log2(self.batch_freq)) + 1)]
  198. if not increase_log_steps:
  199. self.log_steps = [self.batch_freq]
  200. self.clamp = clamp
  201. @rank_zero_only
  202. def _wandb(self, pl_module, images, batch_idx, split):
  203. raise ValueError("No way wandb")
  204. grids = dict()
  205. for k in images:
  206. grid = torchvision.utils.make_grid(images[k])
  207. grids[f"{split}/{k}"] = wandb.Image(grid)
  208. pl_module.logger.experiment.log(grids)
  209. @rank_zero_only
  210. def _testtube(self, pl_module, images, batch_idx, split):
  211. for k in images:
  212. grid = torchvision.utils.make_grid(images[k])
  213. grid = (grid+1.0)/2.0 # -1,1 -> 0,1; c,h,w
  214. tag = f"{split}/{k}"
  215. pl_module.logger.experiment.add_image(
  216. tag, grid,
  217. global_step=pl_module.global_step)
  218. @rank_zero_only
  219. def log_local(self, save_dir, split, images,
  220. global_step, current_epoch, batch_idx):
  221. root = os.path.join(save_dir, "images", split)
  222. for k in images:
  223. grid = torchvision.utils.make_grid(images[k], nrow=4)
  224. grid = (grid+1.0)/2.0 # -1,1 -> 0,1; c,h,w
  225. grid = grid.transpose(0,1).transpose(1,2).squeeze(-1)
  226. grid = grid.numpy()
  227. grid = (grid*255).astype(np.uint8)
  228. filename = "{}_gs-{:06}_e-{:06}_b-{:06}.png".format(
  229. k,
  230. global_step,
  231. current_epoch,
  232. batch_idx)
  233. path = os.path.join(root, filename)
  234. os.makedirs(os.path.split(path)[0], exist_ok=True)
  235. Image.fromarray(grid).save(path)
  236. def log_img(self, pl_module, batch, batch_idx, split="train"):
  237. if (self.check_frequency(batch_idx) and # batch_idx % self.batch_freq == 0
  238. hasattr(pl_module, "log_images") and
  239. callable(pl_module.log_images) and
  240. self.max_images > 0):
  241. logger = type(pl_module.logger)
  242. is_train = pl_module.training
  243. if is_train:
  244. pl_module.eval()
  245. with torch.no_grad():
  246. images = pl_module.log_images(batch, split=split, pl_module=pl_module)
  247. for k in images:
  248. N = min(images[k].shape[0], self.max_images)
  249. images[k] = images[k][:N]
  250. if isinstance(images[k], torch.Tensor):
  251. images[k] = images[k].detach().cpu()
  252. if self.clamp:
  253. images[k] = torch.clamp(images[k], -1., 1.)
  254. self.log_local(pl_module.logger.save_dir, split, images,
  255. pl_module.global_step, pl_module.current_epoch, batch_idx)
  256. logger_log_images = self.logger_log_images.get(logger, lambda *args, **kwargs: None)
  257. logger_log_images(pl_module, images, pl_module.global_step, split)
  258. if is_train:
  259. pl_module.train()
  260. def check_frequency(self, batch_idx):
  261. if (batch_idx % self.batch_freq) == 0 or (batch_idx in self.log_steps):
  262. try:
  263. self.log_steps.pop(0)
  264. except IndexError:
  265. pass
  266. return True
  267. return False
  268. def on_train_batch_end(self, trainer, pl_module, outputs, batch, batch_idx, dataloader_idx):
  269. self.log_img(pl_module, batch, batch_idx, split="train")
  270. def on_validation_batch_end(self, trainer, pl_module, outputs, batch, batch_idx, dataloader_idx):
  271. self.log_img(pl_module, batch, batch_idx, split="val")
  272. if __name__ == "__main__":
  273. # custom parser to specify config files, train, test and debug mode,
  274. # postfix, resume.
  275. # `--key value` arguments are interpreted as arguments to the trainer.
  276. # `nested.key=value` arguments are interpreted as config parameters.
  277. # configs are merged from left-to-right followed by command line parameters.
  278. # model:
  279. # base_learning_rate: float
  280. # target: path to lightning module
  281. # params:
  282. # key: value
  283. # data:
  284. # target: main.DataModuleFromConfig
  285. # params:
  286. # batch_size: int
  287. # wrap: bool
  288. # train:
  289. # target: path to train dataset
  290. # params:
  291. # key: value
  292. # validation:
  293. # target: path to validation dataset
  294. # params:
  295. # key: value
  296. # test:
  297. # target: path to test dataset
  298. # params:
  299. # key: value
  300. # lightning: (optional, has sane defaults and can be specified on cmdline)
  301. # trainer:
  302. # additional arguments to trainer
  303. # logger:
  304. # logger to instantiate
  305. # modelcheckpoint:
  306. # modelcheckpoint to instantiate
  307. # callbacks:
  308. # callback1:
  309. # target: importpath
  310. # params:
  311. # key: value
  312. now = datetime.datetime.now().strftime("%Y-%m-%dT%H-%M-%S")
  313. # add cwd for convenience and to make classes in this file available when
  314. # running as `python main.py`
  315. # (in particular `main.DataModuleFromConfig`)
  316. sys.path.append(os.getcwd())
  317. parser = get_parser()
  318. parser = Trainer.add_argparse_args(parser)
  319. opt, unknown = parser.parse_known_args()
  320. if opt.name and opt.resume:
  321. raise ValueError(
  322. "-n/--name and -r/--resume cannot be specified both."
  323. "If you want to resume training in a new log folder, "
  324. "use -n/--name in combination with --resume_from_checkpoint"
  325. )
  326. if opt.resume:
  327. if not os.path.exists(opt.resume):
  328. raise ValueError("Cannot find {}".format(opt.resume))
  329. if os.path.isfile(opt.resume):
  330. paths = opt.resume.split("/")
  331. idx = len(paths)-paths[::-1].index("logs")+1
  332. logdir = "/".join(paths[:idx])
  333. ckpt = opt.resume
  334. else:
  335. assert os.path.isdir(opt.resume), opt.resume
  336. logdir = opt.resume.rstrip("/")
  337. ckpt = os.path.join(logdir, "checkpoints", "last.ckpt")
  338. opt.resume_from_checkpoint = ckpt
  339. base_configs = sorted(glob.glob(os.path.join(logdir, "configs/*.yaml")))
  340. opt.base = base_configs+opt.base
  341. _tmp = logdir.split("/")
  342. nowname = _tmp[_tmp.index("logs")+1]
  343. else:
  344. if opt.name:
  345. name = "_"+opt.name
  346. elif opt.base:
  347. cfg_fname = os.path.split(opt.base[0])[-1]
  348. cfg_name = os.path.splitext(cfg_fname)[0]
  349. name = "_"+cfg_name
  350. else:
  351. name = ""
  352. nowname = now+name+opt.postfix
  353. logdir = os.path.join("logs", nowname)
  354. ckptdir = os.path.join(logdir, "checkpoints")
  355. cfgdir = os.path.join(logdir, "configs")
  356. seed_everything(opt.seed)
  357. try:
  358. # init and save configs
  359. configs = [OmegaConf.load(cfg) for cfg in opt.base]
  360. cli = OmegaConf.from_dotlist(unknown)
  361. config = OmegaConf.merge(*configs, cli)
  362. lightning_config = config.pop("lightning", OmegaConf.create())
  363. # merge trainer cli with config
  364. trainer_config = lightning_config.get("trainer", OmegaConf.create())
  365. # default to ddp
  366. trainer_config["distributed_backend"] = "ddp"
  367. for k in nondefault_trainer_args(opt):
  368. trainer_config[k] = getattr(opt, k)
  369. if not "gpus" in trainer_config:
  370. del trainer_config["distributed_backend"]
  371. cpu = True
  372. else:
  373. gpuinfo = trainer_config["gpus"]
  374. print(f"Running on GPUs {gpuinfo}")
  375. cpu = False
  376. trainer_opt = argparse.Namespace(**trainer_config)
  377. lightning_config.trainer = trainer_config
  378. # model
  379. model = instantiate_from_config(config.model)
  380. # trainer and callbacks
  381. trainer_kwargs = dict()
  382. # default logger configs
  383. # NOTE wandb < 0.10.0 interferes with shutdown
  384. # wandb >= 0.10.0 seems to fix it but still interferes with pudb
  385. # debugging (wrongly sized pudb ui)
  386. # thus prefer testtube for now
  387. default_logger_cfgs = {
  388. "wandb": {
  389. "target": "pytorch_lightning.loggers.WandbLogger",
  390. "params": {
  391. "name": nowname,
  392. "save_dir": logdir,
  393. "offline": opt.debug,
  394. "id": nowname,
  395. }
  396. },
  397. "testtube": {
  398. "target": "pytorch_lightning.loggers.TestTubeLogger",
  399. "params": {
  400. "name": "testtube",
  401. "save_dir": logdir,
  402. }
  403. },
  404. }
  405. default_logger_cfg = default_logger_cfgs["testtube"]
  406. logger_cfg = lightning_config.logger or OmegaConf.create()
  407. logger_cfg = OmegaConf.merge(default_logger_cfg, logger_cfg)
  408. trainer_kwargs["logger"] = instantiate_from_config(logger_cfg)
  409. # modelcheckpoint - use TrainResult/EvalResult(checkpoint_on=metric) to
  410. # specify which metric is used to determine best models
  411. default_modelckpt_cfg = {
  412. "target": "pytorch_lightning.callbacks.ModelCheckpoint",
  413. "params": {
  414. "dirpath": ckptdir,
  415. "filename": "{epoch:06}",
  416. "verbose": True,
  417. "save_last": True,
  418. }
  419. }
  420. if hasattr(model, "monitor"):
  421. print(f"Monitoring {model.monitor} as checkpoint metric.")
  422. default_modelckpt_cfg["params"]["monitor"] = model.monitor
  423. default_modelckpt_cfg["params"]["save_top_k"] = 3
  424. modelckpt_cfg = lightning_config.modelcheckpoint or OmegaConf.create()
  425. modelckpt_cfg = OmegaConf.merge(default_modelckpt_cfg, modelckpt_cfg)
  426. trainer_kwargs["checkpoint_callback"] = instantiate_from_config(modelckpt_cfg)
  427. # add callback which sets up log directory
  428. default_callbacks_cfg = {
  429. "setup_callback": {
  430. "target": "main.SetupCallback",
  431. "params": {
  432. "resume": opt.resume,
  433. "now": now,
  434. "logdir": logdir,
  435. "ckptdir": ckptdir,
  436. "cfgdir": cfgdir,
  437. "config": config,
  438. "lightning_config": lightning_config,
  439. }
  440. },
  441. "image_logger": {
  442. "target": "main.ImageLogger",
  443. "params": {
  444. "batch_frequency": 750,
  445. "max_images": 4,
  446. "clamp": True
  447. }
  448. },
  449. "learning_rate_logger": {
  450. "target": "main.LearningRateMonitor",
  451. "params": {
  452. "logging_interval": "step",
  453. #"log_momentum": True
  454. }
  455. },
  456. }
  457. callbacks_cfg = lightning_config.callbacks or OmegaConf.create()
  458. callbacks_cfg = OmegaConf.merge(default_callbacks_cfg, callbacks_cfg)
  459. trainer_kwargs["callbacks"] = [instantiate_from_config(callbacks_cfg[k]) for k in callbacks_cfg]
  460. trainer = Trainer.from_argparse_args(trainer_opt, **trainer_kwargs)
  461. # data
  462. data = instantiate_from_config(config.data)
  463. # NOTE according to https://pytorch-lightning.readthedocs.io/en/latest/datamodules.html
  464. # calling these ourselves should not be necessary but it is.
  465. # lightning still takes care of proper multiprocessing though
  466. data.prepare_data()
  467. data.setup()
  468. # configure learning rate
  469. bs, base_lr = config.data.params.batch_size, config.model.base_learning_rate
  470. if not cpu:
  471. ngpu = len(lightning_config.trainer.gpus.strip(",").split(','))
  472. else:
  473. ngpu = 1
  474. accumulate_grad_batches = lightning_config.trainer.accumulate_grad_batches or 1
  475. print(f"accumulate_grad_batches = {accumulate_grad_batches}")
  476. lightning_config.trainer.accumulate_grad_batches = accumulate_grad_batches
  477. model.learning_rate = accumulate_grad_batches * ngpu * bs * base_lr
  478. print("Setting learning rate to {:.2e} = {} (accumulate_grad_batches) * {} (num_gpus) * {} (batchsize) * {:.2e} (base_lr)".format(
  479. model.learning_rate, accumulate_grad_batches, ngpu, bs, base_lr))
  480. # allow checkpointing via USR1
  481. def melk(*args, **kwargs):
  482. # run all checkpoint hooks
  483. if trainer.global_rank == 0:
  484. print("Summoning checkpoint.")
  485. ckpt_path = os.path.join(ckptdir, "last.ckpt")
  486. trainer.save_checkpoint(ckpt_path)
  487. def divein(*args, **kwargs):
  488. if trainer.global_rank == 0:
  489. import pudb; pudb.set_trace()
  490. import signal
  491. signal.signal(signal.SIGUSR1, melk)
  492. signal.signal(signal.SIGUSR2, divein)
  493. # run
  494. if opt.train:
  495. try:
  496. trainer.fit(model, data)
  497. except Exception:
  498. melk()
  499. raise
  500. if not opt.no_test and not trainer.interrupted:
  501. trainer.test(model, data)
  502. except Exception:
  503. if opt.debug and trainer.global_rank==0:
  504. try:
  505. import pudb as debugger
  506. except ImportError:
  507. import pdb as debugger
  508. debugger.post_mortem()
  509. raise
  510. finally:
  511. # move newly created debug project to debug_runs
  512. if opt.debug and not opt.resume and trainer.global_rank==0:
  513. dst, name = os.path.split(logdir)
  514. dst = os.path.join(dst, "debug_runs", name)
  515. os.makedirs(os.path.split(dst)[0], exist_ok=True)
  516. os.rename(logdir, dst)