Coverage for markdiffusion / evaluation / tools / image_editor.py: 95.72%

304 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-14 19:25 +0000

1from PIL import Image, ImageFilter, ImageEnhance, ImageOps, ImageDraw 

2import os 

3import argparse 

4import sys 

5import numpy as np 

6import random 

7 

8 

9class ImageEditor: 

10 def __init__(self): 

11 pass 

12 

13 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

14 pass 

15 

16class JPEGCompression(ImageEditor): 

17 def __init__(self, quality: int = 95): 

18 super().__init__() 

19 self.quality = quality 

20 

21 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

22 image.save(f"temp.jpg", quality=self.quality) 

23 compressed_image = Image.open(f"temp.jpg") 

24 os.remove(f"temp.jpg") 

25 return compressed_image 

26 

27class Rotation(ImageEditor): 

28 def __init__(self, angle: int = 30, expand: bool = False): 

29 super().__init__() 

30 self.angle = angle 

31 self.expand = expand 

32 

33 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

34 return image.rotate(self.angle, expand=self.expand) 

35 

36class CrSc(ImageEditor): 

37 """Crop-and-scale attack. 

38 

39 `position` controls where the crop window is placed: 

40 - "center" (default): top-left corner at ((W-w)//2, (H-h)//2). Backward-compatible. 

41 - "random": offsets are sampled uniformly each call from [0, W-w] x [0, H-h]. 

42 - tuple `(x_ratio, y_ratio)`: explicit normalized offsets in [0, 1] of the slack 

43 (W-w, H-h). e.g. (0.0, 0.0) = top-left, (1.0, 1.0) = bottom-right, (0.5, 0.5) = center. 

44 """ 

45 

46 def __init__(self, crop_ratio: float = 0.8, position="center"): 

47 super().__init__() 

48 self.crop_ratio = crop_ratio 

49 self.position = position 

50 if isinstance(position, str): 

51 if position not in {"center", "random"}: 

52 raise ValueError(f"position must be 'center', 'random', or a (x, y) tuple; got {position!r}") 

53 else: 

54 try: 

55 x_ratio, y_ratio = position 

56 except (TypeError, ValueError) as e: 

57 raise ValueError(f"position tuple must be (x_ratio, y_ratio); got {position!r}") from e 

58 if not (0.0 <= float(x_ratio) <= 1.0 and 0.0 <= float(y_ratio) <= 1.0): 

59 raise ValueError(f"position ratios must be in [0, 1]; got {position!r}") 

60 

61 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

62 width, height = image.size 

63 new_w = int(width * self.crop_ratio) 

64 new_h = int(height * self.crop_ratio) 

65 

66 slack_w = max(0, width - new_w) 

67 slack_h = max(0, height - new_h) 

68 

69 if self.position == "center": 

70 left = slack_w // 2 

71 top = slack_h // 2 

72 elif self.position == "random": 

73 left = random.randint(0, slack_w) if slack_w > 0 else 0 

74 top = random.randint(0, slack_h) if slack_h > 0 else 0 

75 else: 

76 x_ratio, y_ratio = self.position 

77 left = int(round(slack_w * float(x_ratio))) 

78 top = int(round(slack_h * float(y_ratio))) 

79 

80 right = left + new_w 

81 bottom = top + new_h 

82 

83 return image.crop((left, top, right, bottom)).resize((width, height)) 

84 

85class GaussianBlurring(ImageEditor): 

86 def __init__(self, radius: int = 2): 

87 super().__init__() 

88 self.radius = radius 

89 

90 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

91 return image.filter(ImageFilter.GaussianBlur(self.radius)) 

92 

93class GaussianNoise(ImageEditor): 

94 def __init__(self, sigma: float = 25.0): 

95 super().__init__() 

96 self.sigma = sigma 

97 

98 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

99 img = image.convert("RGB") 

100 arr = np.array(img).astype(np.float32) 

101 

102 noise = np.random.normal(0, self.sigma, arr.shape) 

103 noisy_arr = np.clip(arr + noise, 0, 255).astype(np.uint8) 

104 

105 return Image.fromarray(noisy_arr) 

106 

107class Brightness(ImageEditor): 

108 def __init__(self, factor: float = 1.2): 

109 super().__init__() 

110 self.factor = factor 

111 

112 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

113 enhancer = ImageEnhance.Brightness(image) 

114 return enhancer.enhance(self.factor) 

115 

116class Mask(ImageEditor): 

117 def __init__(self, mask_ratio: float = 0.1, num_masks: int = 5): 

118 super().__init__() 

119 self.mask_ratio = mask_ratio 

120 self.num_masks = num_masks 

121 

122 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

123 img = image.copy() 

124 draw = ImageDraw.Draw(img) 

125 width, height = img.size 

126 

127 for _ in range(self.num_masks): 

128 max_mask_width = int(width * self.mask_ratio) 

129 max_mask_height = int(height * self.mask_ratio) 

130 

131 mask_width = random.randint(max_mask_width // 2, max_mask_width) 

132 mask_height = random.randint(max_mask_height // 2, max_mask_height) 

133 

134 x = random.randint(0, width - mask_width) 

135 y = random.randint(0, height - mask_height) 

136 

137 draw.rectangle([x, y, x + mask_width, y + mask_height], fill='black') 

138 

139 return img 

140 

141class Overlay(ImageEditor): 

142 def __init__(self, num_strokes: int = 10, stroke_width: int = 5, stroke_type: str = 'random'): 

143 super().__init__() 

144 self.num_strokes = num_strokes 

145 self.stroke_width = stroke_width 

146 self.stroke_type = stroke_type 

147 

148 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

149 img = image.copy() 

150 draw = ImageDraw.Draw(img) 

151 width, height = img.size 

152 

153 for _ in range(self.num_strokes): 

154 start_x = random.randint(0, width) 

155 start_y = random.randint(0, height) 

156 num_points = random.randint(3, 8) 

157 points = [(start_x, start_y)] 

158 

159 for i in range(num_points - 1): 

160 last_x, last_y = points[-1] 

161 max_step = min(width, height) // 4 

162 new_x = max(0, min(width, last_x + random.randint(-max_step, max_step))) 

163 new_y = max(0, min(height, last_y + random.randint(-max_step, max_step))) 

164 points.append((new_x, new_y)) 

165 

166 if self.stroke_type == 'random': 

167 color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) 

168 elif self.stroke_type == 'black': 

169 color = (0, 0, 0) 

170 elif self.stroke_type == 'white': 

171 color = (255, 255, 255) 

172 else: 

173 color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) 

174 

175 draw.line(points, fill=color, width=self.stroke_width) 

176 

177 return img 

178 

179class AdaptiveNoiseInjection(ImageEditor): 

180 def __init__(self, intensity: float = 0.5, auto_select: bool = True): 

181 super().__init__() 

182 self.intensity = intensity 

183 self.auto_select = auto_select 

184 

185 def _analyze_image_features(self, img_array): 

186 if len(img_array.shape) == 3: 

187 gray = np.mean(img_array, axis=2) 

188 else: 

189 gray = img_array 

190 

191 brightness_mean = np.mean(gray) 

192 brightness_std = np.std(gray) 

193 

194 sobel_x = np.abs(np.diff(gray, axis=1, prepend=gray[:, :1])) 

195 sobel_y = np.abs(np.diff(gray, axis=0, prepend=gray[:1, :])) 

196 edge_density = np.mean(sobel_x + sobel_y) 

197 

198 kernel_size = 5 

199 texture_complexity = 0 

200 h, w = gray.shape 

201 for i in range(0, h - kernel_size, kernel_size): 

202 for j in range(0, w - kernel_size, kernel_size): 

203 patch = gray[i:i+kernel_size, j:j+kernel_size] 

204 texture_complexity += np.std(patch) 

205 texture_complexity /= ((h // kernel_size) * (w // kernel_size)) 

206 

207 return { 

208 'brightness_mean': brightness_mean, 

209 'brightness_std': brightness_std, 

210 'edge_density': edge_density, 

211 'texture_complexity': texture_complexity 

212 } 

213 

214 def _select_noise_type(self, features): 

215 brightness = features['brightness_mean'] 

216 edge_density = features['edge_density'] 

217 texture = features['texture_complexity'] 

218 

219 if brightness < 80: 

220 return 'gaussian' 

221 elif edge_density > 30: 

222 return 'salt_pepper' 

223 elif texture > 20: 

224 return 'speckle' 

225 else: 

226 return 'poisson' 

227 

228 def _add_gaussian_noise(self, img_array, sigma): 

229 noise = np.random.normal(0, sigma, img_array.shape) 

230 noisy = np.clip(img_array + noise, 0, 255) 

231 return noisy.astype(np.uint8) 

232 

233 def _add_salt_pepper_noise(self, img_array, amount): 

234 noisy = img_array.copy() 

235 h, w = img_array.shape[:2] 

236 num_pixels = h * w 

237 

238 num_salt = int(amount * num_pixels * 0.5) 

239 salt_coords_y = np.random.randint(0, h, num_salt) 

240 salt_coords_x = np.random.randint(0, w, num_salt) 

241 noisy[salt_coords_y, salt_coords_x] = 255 

242 

243 num_pepper = int(amount * num_pixels * 0.5) 

244 pepper_coords_y = np.random.randint(0, h, num_pepper) 

245 pepper_coords_x = np.random.randint(0, w, num_pepper) 

246 noisy[pepper_coords_y, pepper_coords_x] = 0 

247 

248 return np.clip(noisy, 0, 255).astype(np.uint8) 

249 

250 def _add_poisson_noise(self, img_array): 

251 vals = len(np.unique(img_array)) 

252 vals = 2 ** np.ceil(np.log2(vals)) 

253 noisy = np.random.poisson(img_array * vals) / float(vals) 

254 return np.clip(noisy, 0, 255).astype(np.uint8) 

255 

256 def _add_speckle_noise(self, img_array, variance): 

257 noise = np.random.randn(*img_array.shape) * variance 

258 noisy = img_array + img_array * noise 

259 return np.clip(noisy, 0, 255).astype(np.uint8) 

260 

261 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

262 img = image.convert("RGB") 

263 img_array = np.array(img).astype(np.float32) 

264 

265 features = self._analyze_image_features(img_array) 

266 

267 if self.auto_select: 

268 noise_type = self._select_noise_type(features) 

269 

270 if noise_type == 'gaussian': 

271 sigma = 40 * self.intensity 

272 noisy_array = self._add_gaussian_noise(img_array, sigma) 

273 elif noise_type == 'salt_pepper': 

274 amount = 0.15 * self.intensity 

275 noisy_array = self._add_salt_pepper_noise(img_array, amount) 

276 elif noise_type == 'poisson': 

277 noisy_array = self._add_poisson_noise(img_array) 

278 blend_factor = min(0.8, self.intensity * 1.5) 

279 noisy_array = np.clip( 

280 img_array * (1 - blend_factor) + noisy_array * blend_factor, 

281 0, 255 

282 ).astype(np.uint8) 

283 else: 

284 variance = 0.5 * self.intensity 

285 noisy_array = self._add_speckle_noise(img_array, variance) 

286 else: 

287 weight = 0.25 

288 noisy_array = img_array.copy() 

289 

290 gaussian = self._add_gaussian_noise(img_array, 30 * self.intensity) 

291 noisy_array = noisy_array * (1 - weight) + gaussian * weight 

292 

293 salt_pepper = self._add_salt_pepper_noise(img_array, 0.08 * self.intensity) 

294 noisy_array = noisy_array * (1 - weight) + salt_pepper * weight 

295 

296 poisson = self._add_poisson_noise(img_array) 

297 noisy_array = noisy_array * (1 - weight) + poisson * weight 

298 

299 speckle = self._add_speckle_noise(img_array, 0.4 * self.intensity) 

300 noisy_array = noisy_array * (1 - weight) + speckle * weight 

301 

302 noisy_array = np.clip(noisy_array, 0, 255).astype(np.uint8) 

303 

304 return Image.fromarray(noisy_array) 

305 

306 

307class DiffusionPurification(ImageEditor): 

308 """Diffusion-based purification (regeneration) attack. 

309 

310 Encodes the input image to latent space, injects Gaussian noise corresponding to 

311 a fraction of the diffusion schedule, then runs reverse denoising to obtain a 

312 regenerated image. Generative-watermark-friendly attack: the watermark survives 

313 only if it is robust to a partial round-trip through a diffusion model. 

314 Reference: Nie et al., "Diffusion Models for Adversarial Purification", ICML 2022. 

315 

316 Args: 

317 diffusion_config: A `DiffusionConfig` providing `pipe`, `device`, and 

318 `num_inference_steps`. By default the purifier reuses 

319 `diffusion_config.pipe` (a `StableDiffusionPipeline`-like object). 

320 purification_strength: Fraction in (0, 1] of the diffusion schedule to use. 

321 Larger values inject more noise (stronger attack, lower fidelity). 

322 prompt: Optional text prompt for classifier-free guidance during denoising. 

323 Empty string by default (unconditional regeneration). 

324 purifier_pipe: Optional override for the pipeline used to purify; useful 

325 when the user wants the purifier to be a different model from the one 

326 that produced the watermarked image. 

327 """ 

328 

329 def __init__(self, diffusion_config, purification_strength: float = 0.3, 

330 prompt: str = "", purifier_pipe=None): 

331 super().__init__() 

332 if not (0.0 < float(purification_strength) <= 1.0): 

333 raise ValueError( 

334 f"purification_strength must be in (0, 1]; got {purification_strength!r}" 

335 ) 

336 self.diffusion_config = diffusion_config 

337 self.purification_strength = float(purification_strength) 

338 self.default_prompt = prompt 

339 self.pipe = purifier_pipe if purifier_pipe is not None else diffusion_config.pipe 

340 

341 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

342 import torch 

343 from markdiffusion.utils.media_utils import transform_to_model_format 

344 

345 prompt = prompt if prompt is not None else self.default_prompt 

346 device = self.diffusion_config.device 

347 target_size = self.diffusion_config.image_size[0] 

348 

349 # 1. Image -> tensor in [-1, 1], shape [1, 3, H, W] 

350 image_tensor = transform_to_model_format(image, target_size=target_size).unsqueeze(0).to(device) 

351 

352 # 2. Encode prompt (classifier-free guidance disabled by default for purification) 

353 with torch.no_grad(): 

354 prompt_embeds, _ = self.pipe.encode_prompt( 

355 prompt=prompt, 

356 device=device, 

357 do_classifier_free_guidance=False, 

358 num_images_per_prompt=1, 

359 ) 

360 image_tensor = image_tensor.to(prompt_embeds.dtype) 

361 

362 # 3. Encode image to latent (matches utils.media_utils scaling factor) 

363 with torch.no_grad(): 

364 latent = self.pipe.vae.encode(image_tensor).latent_dist.sample() * 0.18215 

365 

366 # 4. Pick the timestep range and add noise 

367 scheduler = self.pipe.scheduler 

368 num_steps = self.diffusion_config.num_inference_steps 

369 scheduler.set_timesteps(num_steps, device=device) 

370 n_denoise = max(1, int(round(num_steps * self.purification_strength))) 

371 timesteps_to_use = scheduler.timesteps[-n_denoise:] 

372 t_start = timesteps_to_use[0] 

373 

374 noise = torch.randn_like(latent) 

375 noisy_latent = scheduler.add_noise(latent, noise, t_start.unsqueeze(0)) 

376 

377 # 5. Reverse denoising loop 

378 x = noisy_latent 

379 with torch.no_grad(): 

380 for t in timesteps_to_use: 

381 noise_pred = self.pipe.unet(x, t, encoder_hidden_states=prompt_embeds).sample 

382 x = scheduler.step(noise_pred, t, x).prev_sample 

383 

384 # 6. Decode back to image, then to PIL 

385 with torch.no_grad(): 

386 decoded = self.pipe.vae.decode(x / 0.18215, return_dict=False)[0] 

387 decoded = (decoded / 2 + 0.5).clamp(0, 1) 

388 arr = (decoded[0].cpu().float().permute(1, 2, 0).numpy() * 255.0).round().astype(np.uint8) 

389 

390 out = Image.fromarray(arr) 

391 if out.size != image.size: 

392 out = out.resize(image.size) 

393 return out 

394 

395 

396class NeuralCodecCompression(ImageEditor): 

397 """Learned-image-codec compression (regeneration) attack. 

398 

399 Re-encodes the image through a pretrained neural compression model, simulating 

400 the kind of distortion a downstream encoder would introduce at a target bitrate. 

401 Backed by `compressai` (an [optional] dependency). 

402 Reference: Cheng et al., "Learned Image Compression with Discretized Gaussian 

403 Mixture Likelihoods and Attention Modules", CVPR 2020. 

404 

405 Args: 

406 quality: Quality level passed to compressai's pretrained zoo 

407 (typically 1-8 for cheng2020-anchor; higher = better fidelity, more bits). 

408 model_name: Any key from `compressai.zoo.image_models`. Defaults to 

409 'cheng2020-anchor'. Other useful choices: 'bmshj2018-factorized', 

410 'bmshj2018-hyperprior', 'mbt2018', 'cheng2020-attn'. 

411 device: Override the device for the codec; defaults to CUDA if available else CPU. 

412 """ 

413 

414 _MODEL_CACHE = {} # (model_name, quality, device) -> nn.Module 

415 

416 def __init__(self, quality: int = 5, model_name: str = "cheng2020-anchor", 

417 device: str = None): 

418 super().__init__() 

419 self.quality = int(quality) 

420 self.model_name = model_name 

421 self.device = device 

422 

423 def _get_model(self): 

424 import torch 

425 try: 

426 from compressai.zoo import image_models 

427 except ImportError as e: 

428 raise ImportError( 

429 "NeuralCodecCompression requires `compressai`. " 

430 "Install with: pip install -e '.[optional]' (or `pip install compressai`)." 

431 ) from e 

432 if self.model_name not in image_models: 

433 raise ValueError( 

434 f"Unknown compressai model {self.model_name!r}; " 

435 f"available: {sorted(image_models)}" 

436 ) 

437 device = self.device or ("cuda" if torch.cuda.is_available() else "cpu") 

438 cache_key = (self.model_name, self.quality, device) 

439 net = self._MODEL_CACHE.get(cache_key) 

440 if net is None: 

441 net = image_models[self.model_name](quality=self.quality, pretrained=True) 

442 net = net.eval().to(device) 

443 self._MODEL_CACHE[cache_key] = net 

444 return net, device 

445 

446 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image: 

447 import torch 

448 from torchvision import transforms 

449 

450 net, device = self._get_model() 

451 original_size = image.size 

452 

453 # compressai models expect input dimensions divisible by 64. 

454 w, h = original_size 

455 new_w = max(64, (w // 64) * 64) 

456 new_h = max(64, (h // 64) * 64) 

457 if (new_w, new_h) != (w, h): 

458 input_image = image.resize((new_w, new_h)) 

459 else: 

460 input_image = image 

461 

462 x = transforms.ToTensor()(input_image.convert("RGB")).unsqueeze(0).to(device) 

463 with torch.no_grad(): 

464 out = net(x) 

465 x_hat = out["x_hat"].clamp(0, 1) 

466 

467 out_pil = transforms.ToPILImage()(x_hat[0].cpu()) 

468 if out_pil.size != original_size: 

469 out_pil = out_pil.resize(original_size) 

470 return out_pil