| | from typing import Optional |
| |
|
| | import torch |
| |
|
| |
|
| | def _make_causal_mask( |
| | attention_mask: torch.Tensor, dtype: torch.dtype, device: torch.device |
| | ): |
| | """ |
| | Make causal mask used for bi-directional self-attention. |
| | """ |
| | bsz, tgt_len = attention_mask.shape |
| | mask = torch.full((tgt_len, tgt_len), torch.tensor(torch.finfo(dtype).min, device=device), device=device) |
| | mask_cond = torch.arange(mask.size(-1), device=device) |
| | mask.masked_fill_(mask_cond < (mask_cond + 1).view(mask.size(-1), 1), 0) |
| | mask = mask.to(dtype) |
| |
|
| | return mask[None, None, :, :].expand(bsz, 1, tgt_len, tgt_len) |
| |
|
| |
|
| | def _make_2dvison_mask(column_mask, dtype: torch.dtype, device: torch.device): |
| | """ |
| | """ |
| | bsz, seq_length = column_mask.shape |
| | cross_mask = torch.zeros((bsz, 1, seq_length, seq_length), dtype=dtype, device=device) |
| |
|
| | |
| | start = None |
| | for bsz_idx in range(bsz): |
| | for i in range(seq_length): |
| | if column_mask[bsz_idx, i] == 1: |
| | if start is None: |
| | start = i |
| | else: |
| | if start is not None: |
| | |
| | cross_mask[bsz_idx, 0, start:i, start:i] = 1 |
| | start = None |
| |
|
| | |
| | if start is not None: |
| | cross_mask[bsz_idx, 0, start:seq_length, start:seq_length] = 1 |
| |
|
| | return cross_mask |
| |
|
| |
|
| | def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None): |
| | """ |
| | Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`. |
| | """ |
| | bsz, src_len = mask.size() |
| | tgt_len = tgt_len if tgt_len is not None else src_len |
| |
|
| | expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype) |
| |
|
| | inverted_mask = 1.0 - expanded_mask |
| |
|
| | return inverted_mask.masked_fill_(inverted_mask.to(torch.bool), torch.finfo(dtype).min) |
| |
|
| |
|
| | def make_mask(attention_mask: torch.Tensor, dtype: torch.dtype=None, device: torch.device=None, mode: str="default", vision_mask: torch.Tensor=None, ): |
| | if dtype is None: |
| | dtype = attention_mask.dtype |
| | if device is None: |
| | device = attention_mask.device |
| | expanded_attn_mask = _expand_mask(attention_mask, dtype).to(device) |
| | causal_mask = _make_causal_mask(attention_mask, dtype, device).to(device) |
| | if mode == "default": |
| | return attention_mask |
| | else: |
| | assert vision_mask is not None, "vision_mask is None" |
| | vision_mask = vision_mask.to(device) |
| | bsz, seq_length = attention_mask.shape |
| | vision_mask_bg = vision_mask[:, None, :, None] |
| | vision_mask_2d = _make_2dvison_mask(vision_mask, dtype, device) |
| | if mode == "bidirectional": |
| | mask = expanded_attn_mask + causal_mask |
| | mask = mask.clone().masked_fill_(vision_mask_2d.to(torch.bool), 0) |
| | return mask |
| | else: |
| | raise NotImplementedError(f"mode {mode} is not implemented") |
| |
|