123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- """A layer that compute logits from hidden_stats."""
- from typing import Optional
- import torch
- import torch.nn as nn
- from aphrodite.distributed import tensor_model_parallel_gather
- from aphrodite.modeling.sampling_metadata import SamplingMetadata
- class LogitsProcessor(nn.Module):
- """Process logits and apply logits processors from sampling metadata.
- This layer does the following:
- 1. Gather logits from model hidden_states.
- 2. Scale logits if needed.
- 3. Apply logits processors (if any).
- """
- def __init__(self,
- vocab_size: int,
- org_vocab_size: Optional[int] = None,
- scale: Optional[float] = 1.0,
- logits_as_input: bool = False) -> None:
- """
- Args:
- scale: A scaling factor to apply to the logits.
- """
- super().__init__()
- self.scale = scale
- self.vocab_size = vocab_size
- # Whether the input is logits (default is hidden states).
- self.logits_as_input = logits_as_input
- # original vocabulary size (without LoRA).
- self.org_vocab_size = org_vocab_size or vocab_size
- def forward(
- self,
- embedding: torch.Tensor,
- hidden_states: torch.Tensor,
- sampling_metadata: SamplingMetadata,
- embedding_bias: Optional[torch.Tensor] = None,
- ) -> torch.Tensor:
- if self.logits_as_input:
- logits = hidden_states
- else:
- hidden_states = _prune_hidden_states(hidden_states,
- sampling_metadata)
- # Get the logits for the next tokens.
- logits = self._get_logits(hidden_states, embedding, embedding_bias)
- if logits is not None:
- logits *= self.scale
- # Apply logits processors (if any).
- logits = _apply_logits_processors(logits, sampling_metadata)
- return logits
- def _get_logits(self, hidden_states: torch.Tensor, embedding: torch.Tensor,
- embedding_bias: Optional[torch.Tensor]) -> torch.Tensor:
- # Get the logits for the next tokens.
- logits = torch.matmul(hidden_states, embedding.t())
- if embedding_bias is not None:
- logits += embedding_bias
- logits = tensor_model_parallel_gather(logits)
- # Remove paddings in vocab (if any).
- if logits is not None:
- logits = logits[:, :self.org_vocab_size]
- return logits
- def _prune_hidden_states(
- hidden_states: torch.Tensor,
- sampling_metadata: SamplingMetadata,
- ) -> torch.Tensor:
- return hidden_states.index_select(0,
- sampling_metadata.selected_token_indices)
- def _apply_logits_processors(
- logits: torch.Tensor,
- sampling_metadata: SamplingMetadata,
- ) -> torch.Tensor:
- found_logits_processors = False
- logits_processed = 0
- for seq_group in sampling_metadata.seq_groups:
- seq_ids = seq_group.seq_ids
- sampling_params = seq_group.sampling_params
- logits_processors = sampling_params.logits_processors
- if logits_processors:
- found_logits_processors = True
- for seq_id, logits_row_idx in zip(seq_ids,
- seq_group.sample_indices):
- logits_row = logits[logits_row_idx]
- token_ids = seq_group.seq_data[seq_id].output_token_ids
- for logits_processor in logits_processors:
- logits_row = logits_processor(token_ids, logits_row)
- logits[logits_row_idx] = logits_row
- logits_processed += len(seq_group.sample_indices) + len(
- seq_group.prompt_logprob_indices)
- if found_logits_processors:
- # verifies that no rows in logits were missed unexpectedly
- assert logits_processed == logits.shape[0]
- return logits
|