Coverage for evaluation / tools / video_editor.py: 99.13%
115 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 10:24 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-22 10:24 +0000
1# Copyright 2025 THU-BPM MarkDiffusion.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from PIL import Image
17from typing import List
18import cv2
19import numpy as np
20import tempfile
21import os
22import random
23import subprocess
24import shutil
26class VideoEditor:
27 """Base class for video editors."""
29 def __init__(self):
30 pass
32 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]:
33 pass
35class MPEG4Compression(VideoEditor):
36 """MPEG-4 compression video editor."""
38 def __init__(self, fps: float = 24.0):
39 """Initialize the MPEG-4 compression video editor.
41 Args:
42 fps (float, optional): The frames per second of the compressed video. Defaults to 24.0.
43 """
44 self.fourcc = cv2.VideoWriter_fourcc(*'mp4v')
45 self.fps = fps
47 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]:
48 """Compress the video using MPEG-4 compression.
50 Args:
51 frames (List[Image.Image]): The frames to compress.
52 prompt (str, optional): The prompt for video editing. Defaults to None.
54 Returns:
55 List[Image.Image]: The compressed frames.
56 """
57 # Transform PIL images to numpy arrays and convert to BGR format
58 frame_arrays = [cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR) for f in frames]
60 # Get frame size
61 height, width, _ = frame_arrays[0].shape
63 # Use a temporary file to save the mp4 video
64 with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
65 video_path = tmp.name
67 # Write mp4 video (MPEG-4 encoding)
68 out = cv2.VideoWriter(video_path, self.fourcc, self.fps, (width, height))
70 for frame in frame_arrays:
71 out.write(frame)
72 out.release()
74 # Read mp4 video and decode back to frames
75 cap = cv2.VideoCapture(video_path)
76 compressed_frames = []
77 while True:
78 ret, frame = cap.read()
79 if not ret:
80 break
81 # Transform back to PIL.Image
82 pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
83 compressed_frames.append(pil_img)
84 cap.release()
86 # Clean up temporary file
87 os.remove(video_path)
89 return compressed_frames
92class FrameAverage(VideoEditor):
93 """Frame average video editor."""
95 def __init__(self, n_frames: int = 3):
96 """Initialize the frame average video editor.
98 Args:
99 n_frames (int, optional): The number of frames to average. Defaults to 3.
100 """
101 self.n_frames = n_frames
103 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]:
104 """Average frames in a window of size n_frames.
106 Args:
107 frames (List[Image.Image]): The frames to average.
108 prompt (str, optional): The prompt for video editing. Defaults to None.
110 Returns:
111 List[Image.Image]: The averaged frames.
112 """
113 n = self.n_frames
114 num_frames = len(frames)
115 # Transform all PIL images to numpy arrays and convert to float32 for averaging
116 arrays = [np.asarray(img).astype(np.float32) for img in frames]
117 result = []
118 for i in range(num_frames):
119 # Determine current window
120 start = max(0, i - n // 2)
121 end = min(num_frames, start + n)
122 # If the end exceeds, move the window to the left
123 start = max(0, end - n)
124 window = arrays[start:end]
125 avg = np.mean(window, axis=0).astype(np.uint8)
126 result.append(Image.fromarray(avg))
127 return result
130class FrameRateAdapter(VideoEditor):
131 """Resample videos to a target frame rate using linear interpolation."""
133 def __init__(self, source_fps: float = 30.0, target_fps: float = 24.0):
134 """Initialize the frame rate adapter.
136 Args:
137 source_fps (float, optional): Original frames per second. Defaults to 30.0.
138 target_fps (float, optional): Desired frames per second. Defaults to 24.0.
139 """
140 if source_fps <= 0 or target_fps <= 0:
141 raise ValueError("source_fps and target_fps must be positive numbers")
142 self.source_fps = source_fps
143 self.target_fps = target_fps
145 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]:
146 """Resample frames to match the target frame rate while preserving duration."""
147 if not frames or self.source_fps == self.target_fps:
148 return [frame.copy() for frame in frames]
150 arrays = [np.asarray(frame).astype(np.float32) for frame in frames]
151 num_frames = len(arrays)
152 if num_frames == 1:
153 return [Image.fromarray(arrays[0].astype(np.uint8))]
155 duration = (num_frames - 1) / self.source_fps
156 if duration <= 0:
157 return [Image.fromarray(arr.astype(np.uint8)) for arr in arrays]
159 target_count = max(1, int(round(duration * self.target_fps)) + 1)
160 indices = np.linspace(0, num_frames - 1, target_count)
162 resampled_frames: List[Image.Image] = []
163 for idx in indices:
164 lower = int(np.floor(idx))
165 upper = min(int(np.ceil(idx)), num_frames - 1)
166 if lower == upper:
167 interp = arrays[lower]
168 else:
169 alpha = idx - lower
170 interp = (1 - alpha) * arrays[lower] + alpha * arrays[upper]
171 resampled_frames.append(Image.fromarray(np.clip(interp, 0, 255).astype(np.uint8)))
172 return resampled_frames
175class FrameSwap(VideoEditor):
176 """Frame swap video editor."""
178 def __init__(self, p: float = 0.25):
179 """Initialize the frame swap video editor.
181 Args:
182 p (float, optional): The probability of swapping neighbor frames. Defaults to 0.25.
183 """
184 self.p = p
186 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]:
187 """Swap adjacent frames with probability p.
189 Args:
190 frames (List[Image.Image]): The frames to swap.
191 prompt (str, optional): The prompt for video editing. Defaults to None.
193 Returns:
194 List[Image.Image]: The swapped frames.
195 """
196 for i, frame in enumerate(frames):
197 if i == 0:
198 continue
199 if random.random() >= self.p:
200 frames[i - 1], frames[i] = frames[i], frames[i - 1]
201 return frames
204class FrameInterpolationAttack(VideoEditor):
205 """Insert interpolated frames to alter temporal sampling density."""
207 def __init__(self, interpolated_frames: int = 1):
208 """Initialize the interpolation attack editor.
210 Args:
211 interpolated_frames (int, optional): Number of synthetic frames added between consecutive original frames. Defaults to 1.
212 """
213 if interpolated_frames < 0:
214 raise ValueError("interpolated_frames must be non-negative")
215 self.interpolated_frames = interpolated_frames
217 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]:
218 """Insert interpolated frames between originals using linear blending."""
219 if not frames or self.interpolated_frames == 0:
220 return [frame.copy() for frame in frames]
221 if len(frames) == 1:
222 return [frames[0].copy()]
224 arrays = [np.asarray(frame).astype(np.float32) for frame in frames]
225 result: List[Image.Image] = []
226 last_index = len(frames) - 1
227 for idx in range(last_index):
228 start = arrays[idx]
229 end = arrays[idx + 1]
230 result.append(frames[idx].copy())
231 for insert_idx in range(1, self.interpolated_frames + 1):
232 alpha = insert_idx / (self.interpolated_frames + 1)
233 interp = (1 - alpha) * start + alpha * end
234 result.append(Image.fromarray(np.clip(interp, 0, 255).astype(np.uint8)))
235 result.append(frames[-1].copy())
236 return result