123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- # Copyright (c) 2022, Tri Dao.
- import torch
- import torch.nn as nn
- from einops import rearrange
- from torch import Tensor
- from flash_attn.utils.distributed import all_reduce, reduce_scatter
- class GPT2Embeddings(nn.Module):
- def __init__(
- self,
- embed_dim,
- vocab_size,
- max_position_embeddings,
- padding_idx=None,
- word_embed_proj_dim=None,
- device=None,
- dtype=None,
- ):
- """
- If max_position_embeddings <= 0, there's no position embeddings
- If word_embe_proj_dim is not None (e.g., OPT-350m), we embed to that dimension
- the project up to embed_dim
- """
- factory_kwargs = {"device": device, "dtype": dtype}
- super().__init__()
- if word_embed_proj_dim is None:
- self.word_embeddings = nn.Embedding(
- vocab_size, embed_dim, padding_idx=padding_idx, **factory_kwargs
- )
- self.project_in = None
- else:
- self.word_embeddings = nn.Embedding(
- vocab_size, word_embed_proj_dim, padding_idx=padding_idx, **factory_kwargs
- )
- self.project_in = nn.Linear(
- word_embed_proj_dim, embed_dim, bias=False, **factory_kwargs
- )
- self.max_position_embeddings = max_position_embeddings
- if self.max_position_embeddings > 0:
- self.position_embeddings = nn.Embedding(
- max_position_embeddings, embed_dim, **factory_kwargs
- )
- def forward(self, input_ids, position_ids=None):
- """
- input_ids: (batch, seqlen)
- position_ids: (batch, seqlen)
- """
- batch_size, seqlen = input_ids.shape
- embeddings = self.word_embeddings(input_ids)
- if self.project_in is not None:
- embeddings = self.project_in(embeddings)
- if self.max_position_embeddings > 0:
- if position_ids is None:
- position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device)
- position_embeddings = self.position_embeddings(position_ids)
- embeddings = embeddings + position_embeddings
- return embeddings
- class BertEmbeddings(nn.Module):
- def __init__(
- self,
- embed_dim,
- vocab_size,
- max_position_embeddings,
- type_vocab_size,
- padding_idx=None,
- device=None,
- dtype=None,
- ):
- """
- If max_position_embeddings <= 0, there's no position embeddings
- If type_vocab_size <= 0, there's no token type embeddings
- """
- factory_kwargs = {"device": device, "dtype": dtype}
- super().__init__()
- self.word_embeddings = nn.Embedding(
- vocab_size, embed_dim, padding_idx=padding_idx, **factory_kwargs
- )
- self.max_position_embeddings = max_position_embeddings
- self.type_vocab_size = type_vocab_size
- if self.max_position_embeddings > 0:
- self.position_embeddings = nn.Embedding(
- max_position_embeddings, embed_dim, **factory_kwargs
- )
- if self.type_vocab_size > 0:
- self.token_type_embeddings = nn.Embedding(type_vocab_size, embed_dim, **factory_kwargs)
- def forward(self, input_ids, position_ids=None, token_type_ids=None):
- """
- input_ids: (batch, seqlen)
- position_ids: (batch, seqlen)
- token_type_ids: (batch, seqlen)
- """
- batch_size, seqlen = input_ids.shape
- embeddings = self.word_embeddings(input_ids)
- if self.max_position_embeddings > 0:
- if position_ids is None:
- position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device)
- position_embeddings = self.position_embeddings(position_ids)
- embeddings = embeddings + position_embeddings
- if self.type_vocab_size > 0:
- if token_type_ids is None:
- token_type_ids = torch.zeros(seqlen, dtype=torch.long, device=input_ids.device)
- token_type_embeddings = self.token_type_embeddings(token_type_ids)
- embeddings = embeddings + token_type_embeddings
- return embeddings
- class VocabParallelEmbedding(nn.Embedding):
- def __init__(self, num_embeddings, *args, process_group=None, padding_idx=None, **kwargs):
- self.process_group = process_group
- if process_group is not None:
- world_size = torch.distributed.get_world_size(process_group)
- if num_embeddings % world_size != 0:
- raise ValueError(
- f"num_embeddings ({num_embeddings}) must be divisible by "
- f"world_size ({world_size})"
- )
- if world_size > 1 and padding_idx is not None:
- raise RuntimeError("ParallelEmbedding does not support padding_idx")
- else:
- world_size = 1
- super().__init__(num_embeddings // world_size, *args, padding_idx=padding_idx, **kwargs)
- def forward(self, input: Tensor) -> Tensor:
- if self.process_group is None:
- return super().forward(input)
- else:
- rank = torch.distributed.get_rank(self.process_group)
- vocab_size = self.num_embeddings
- vocab_start_index, vocab_end_index = rank * vocab_size, (rank + 1) * vocab_size
- # Create a mask of valid vocab ids (1 means it needs to be masked).
- input_ids_mask = (input < vocab_start_index) | (input >= vocab_end_index)
- input = input - vocab_start_index
- input[input_ids_mask] = 0
- embeddings = super().forward(input)
- embeddings[input_ids_mask] = 0.0
- return embeddings
- class ColumnParallelEmbedding(nn.Embedding):
- def __init__(self, num_embeddings, embedding_dim, *args, process_group=None, **kwargs):
- self.process_group = process_group
- if process_group is not None:
- world_size = torch.distributed.get_world_size(process_group)
- if embedding_dim % world_size != 0:
- raise ValueError(
- f"embedding_dim ({embedding_dim}) must be divisible by "
- f"world_size ({world_size})"
- )
- else:
- world_size = 1
- super().__init__(num_embeddings, embedding_dim // world_size, *args, **kwargs)
- class ParallelGPT2Embeddings(nn.Module):
- def __init__(
- self,
- embed_dim,
- vocab_size,
- max_position_embeddings,
- process_group,
- padding_idx=None,
- sequence_parallel=True,
- device=None,
- dtype=None,
- ):
- """
- If max_position_embeddings <= 0, there's no position embeddings
- """
- factory_kwargs = {"device": device, "dtype": dtype}
- super().__init__()
- self.process_group = process_group
- self.sequence_parallel = sequence_parallel
- self.word_embeddings = VocabParallelEmbedding(
- vocab_size,
- embed_dim,
- padding_idx=padding_idx,
- process_group=process_group,
- **factory_kwargs,
- )
- self.max_position_embeddings = max_position_embeddings
- if self.max_position_embeddings > 0:
- self.position_embeddings = ColumnParallelEmbedding(
- max_position_embeddings, embed_dim, process_group=process_group, **factory_kwargs
- )
- def forward(self, input_ids, position_ids=None, combine_batch_seqlen_dim=False):
- """
- input_ids: (batch, seqlen)
- position_ids: (batch, seqlen)
- """
- batch_size, seqlen = input_ids.shape
- world_size = torch.distributed.get_world_size(self.process_group)
- embeddings = self.word_embeddings(input_ids)
- if self.max_position_embeddings > 0:
- if position_ids is None:
- position_ids = torch.arange(seqlen, dtype=torch.long, device=input_ids.device)
- position_embeddings = self.position_embeddings(position_ids)
- if world_size <= 1:
- embeddings = embeddings + position_embeddings
- else:
- partition_dim = self.position_embeddings.embedding_dim
- rank = torch.distributed.get_rank(self.process_group)
- embeddings[
- ..., rank * partition_dim : (rank + 1) * partition_dim
- ] += position_embeddings
- if combine_batch_seqlen_dim:
- embeddings = rearrange(embeddings, "b s d -> (b s) d")
- reduce_fn = reduce_scatter if self.sequence_parallel else all_reduce
- return embeddings if world_size <= 1 else reduce_fn(embeddings, self.process_group)
|