Coverage for evaluation / tools / video_editor.py: 99.13%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-22 10:24 +0000

1# Copyright 2025 THU-BPM MarkDiffusion. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16from PIL import Image 

17from typing import List 

18import cv2 

19import numpy as np 

20import tempfile 

21import os 

22import random 

23import subprocess 

24import shutil 

25 

26class VideoEditor: 

27 """Base class for video editors.""" 

28 

29 def __init__(self): 

30 pass 

31 

32 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]: 

33 pass 

34 

35class MPEG4Compression(VideoEditor): 

36 """MPEG-4 compression video editor.""" 

37 

38 def __init__(self, fps: float = 24.0): 

39 """Initialize the MPEG-4 compression video editor. 

40 

41 Args: 

42 fps (float, optional): The frames per second of the compressed video. Defaults to 24.0. 

43 """ 

44 self.fourcc = cv2.VideoWriter_fourcc(*'mp4v') 

45 self.fps = fps 

46 

47 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]: 

48 """Compress the video using MPEG-4 compression. 

49 

50 Args: 

51 frames (List[Image.Image]): The frames to compress. 

52 prompt (str, optional): The prompt for video editing. Defaults to None. 

53 

54 Returns: 

55 List[Image.Image]: The compressed frames. 

56 """ 

57 # Transform PIL images to numpy arrays and convert to BGR format 

58 frame_arrays = [cv2.cvtColor(np.array(f), cv2.COLOR_RGB2BGR) for f in frames] 

59 

60 # Get frame size 

61 height, width, _ = frame_arrays[0].shape 

62 

63 # Use a temporary file to save the mp4 video 

64 with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: 

65 video_path = tmp.name 

66 

67 # Write mp4 video (MPEG-4 encoding) 

68 out = cv2.VideoWriter(video_path, self.fourcc, self.fps, (width, height)) 

69 

70 for frame in frame_arrays: 

71 out.write(frame) 

72 out.release() 

73 

74 # Read mp4 video and decode back to frames 

75 cap = cv2.VideoCapture(video_path) 

76 compressed_frames = [] 

77 while True: 

78 ret, frame = cap.read() 

79 if not ret: 

80 break 

81 # Transform back to PIL.Image 

82 pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) 

83 compressed_frames.append(pil_img) 

84 cap.release() 

85 

86 # Clean up temporary file 

87 os.remove(video_path) 

88 

89 return compressed_frames 

90 

91 

92class FrameAverage(VideoEditor): 

93 """Frame average video editor.""" 

94 

95 def __init__(self, n_frames: int = 3): 

96 """Initialize the frame average video editor. 

97 

98 Args: 

99 n_frames (int, optional): The number of frames to average. Defaults to 3. 

100 """ 

101 self.n_frames = n_frames 

102 

103 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]: 

104 """Average frames in a window of size n_frames. 

105 

106 Args: 

107 frames (List[Image.Image]): The frames to average. 

108 prompt (str, optional): The prompt for video editing. Defaults to None. 

109 

110 Returns: 

111 List[Image.Image]: The averaged frames. 

112 """ 

113 n = self.n_frames 

114 num_frames = len(frames) 

115 # Transform all PIL images to numpy arrays and convert to float32 for averaging 

116 arrays = [np.asarray(img).astype(np.float32) for img in frames] 

117 result = [] 

118 for i in range(num_frames): 

119 # Determine current window 

120 start = max(0, i - n // 2) 

121 end = min(num_frames, start + n) 

122 # If the end exceeds, move the window to the left 

123 start = max(0, end - n) 

124 window = arrays[start:end] 

125 avg = np.mean(window, axis=0).astype(np.uint8) 

126 result.append(Image.fromarray(avg)) 

127 return result 

128 

129 

130class FrameRateAdapter(VideoEditor): 

131 """Resample videos to a target frame rate using linear interpolation.""" 

132 

133 def __init__(self, source_fps: float = 30.0, target_fps: float = 24.0): 

134 """Initialize the frame rate adapter. 

135 

136 Args: 

137 source_fps (float, optional): Original frames per second. Defaults to 30.0. 

138 target_fps (float, optional): Desired frames per second. Defaults to 24.0. 

139 """ 

140 if source_fps <= 0 or target_fps <= 0: 

141 raise ValueError("source_fps and target_fps must be positive numbers") 

142 self.source_fps = source_fps 

143 self.target_fps = target_fps 

144 

145 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]: 

146 """Resample frames to match the target frame rate while preserving duration.""" 

147 if not frames or self.source_fps == self.target_fps: 

148 return [frame.copy() for frame in frames] 

149 

150 arrays = [np.asarray(frame).astype(np.float32) for frame in frames] 

151 num_frames = len(arrays) 

152 if num_frames == 1: 

153 return [Image.fromarray(arrays[0].astype(np.uint8))] 

154 

155 duration = (num_frames - 1) / self.source_fps 

156 if duration <= 0: 

157 return [Image.fromarray(arr.astype(np.uint8)) for arr in arrays] 

158 

159 target_count = max(1, int(round(duration * self.target_fps)) + 1) 

160 indices = np.linspace(0, num_frames - 1, target_count) 

161 

162 resampled_frames: List[Image.Image] = [] 

163 for idx in indices: 

164 lower = int(np.floor(idx)) 

165 upper = min(int(np.ceil(idx)), num_frames - 1) 

166 if lower == upper: 

167 interp = arrays[lower] 

168 else: 

169 alpha = idx - lower 

170 interp = (1 - alpha) * arrays[lower] + alpha * arrays[upper] 

171 resampled_frames.append(Image.fromarray(np.clip(interp, 0, 255).astype(np.uint8))) 

172 return resampled_frames 

173 

174 

175class FrameSwap(VideoEditor): 

176 """Frame swap video editor.""" 

177 

178 def __init__(self, p: float = 0.25): 

179 """Initialize the frame swap video editor. 

180 

181 Args: 

182 p (float, optional): The probability of swapping neighbor frames. Defaults to 0.25. 

183 """ 

184 self.p = p 

185 

186 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]: 

187 """Swap adjacent frames with probability p. 

188 

189 Args: 

190 frames (List[Image.Image]): The frames to swap. 

191 prompt (str, optional): The prompt for video editing. Defaults to None. 

192 

193 Returns: 

194 List[Image.Image]: The swapped frames. 

195 """ 

196 for i, frame in enumerate(frames): 

197 if i == 0: 

198 continue 

199 if random.random() >= self.p: 

200 frames[i - 1], frames[i] = frames[i], frames[i - 1] 

201 return frames 

202 

203 

204class FrameInterpolationAttack(VideoEditor): 

205 """Insert interpolated frames to alter temporal sampling density.""" 

206 

207 def __init__(self, interpolated_frames: int = 1): 

208 """Initialize the interpolation attack editor. 

209 

210 Args: 

211 interpolated_frames (int, optional): Number of synthetic frames added between consecutive original frames. Defaults to 1. 

212 """ 

213 if interpolated_frames < 0: 

214 raise ValueError("interpolated_frames must be non-negative") 

215 self.interpolated_frames = interpolated_frames 

216 

217 def edit(self, frames: List[Image.Image], prompt: str = None) -> List[Image.Image]: 

218 """Insert interpolated frames between originals using linear blending.""" 

219 if not frames or self.interpolated_frames == 0: 

220 return [frame.copy() for frame in frames] 

221 if len(frames) == 1: 

222 return [frames[0].copy()] 

223 

224 arrays = [np.asarray(frame).astype(np.float32) for frame in frames] 

225 result: List[Image.Image] = [] 

226 last_index = len(frames) - 1 

227 for idx in range(last_index): 

228 start = arrays[idx] 

229 end = arrays[idx + 1] 

230 result.append(frames[idx].copy()) 

231 for insert_idx in range(1, self.interpolated_frames + 1): 

232 alpha = insert_idx / (self.interpolated_frames + 1) 

233 interp = (1 - alpha) * start + alpha * end 

234 result.append(Image.fromarray(np.clip(interp, 0, 255).astype(np.uint8))) 

235 result.append(frames[-1].copy()) 

236 return result 

237