Coverage for markdiffusion / evaluation / tools / image_editor.py: 95.72%
304 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 19:25 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-14 19:25 +0000
1from PIL import Image, ImageFilter, ImageEnhance, ImageOps, ImageDraw
2import os
3import argparse
4import sys
5import numpy as np
6import random
9class ImageEditor:
10 def __init__(self):
11 pass
13 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
14 pass
16class JPEGCompression(ImageEditor):
17 def __init__(self, quality: int = 95):
18 super().__init__()
19 self.quality = quality
21 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
22 image.save(f"temp.jpg", quality=self.quality)
23 compressed_image = Image.open(f"temp.jpg")
24 os.remove(f"temp.jpg")
25 return compressed_image
27class Rotation(ImageEditor):
28 def __init__(self, angle: int = 30, expand: bool = False):
29 super().__init__()
30 self.angle = angle
31 self.expand = expand
33 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
34 return image.rotate(self.angle, expand=self.expand)
36class CrSc(ImageEditor):
37 """Crop-and-scale attack.
39 `position` controls where the crop window is placed:
40 - "center" (default): top-left corner at ((W-w)//2, (H-h)//2). Backward-compatible.
41 - "random": offsets are sampled uniformly each call from [0, W-w] x [0, H-h].
42 - tuple `(x_ratio, y_ratio)`: explicit normalized offsets in [0, 1] of the slack
43 (W-w, H-h). e.g. (0.0, 0.0) = top-left, (1.0, 1.0) = bottom-right, (0.5, 0.5) = center.
44 """
46 def __init__(self, crop_ratio: float = 0.8, position="center"):
47 super().__init__()
48 self.crop_ratio = crop_ratio
49 self.position = position
50 if isinstance(position, str):
51 if position not in {"center", "random"}:
52 raise ValueError(f"position must be 'center', 'random', or a (x, y) tuple; got {position!r}")
53 else:
54 try:
55 x_ratio, y_ratio = position
56 except (TypeError, ValueError) as e:
57 raise ValueError(f"position tuple must be (x_ratio, y_ratio); got {position!r}") from e
58 if not (0.0 <= float(x_ratio) <= 1.0 and 0.0 <= float(y_ratio) <= 1.0):
59 raise ValueError(f"position ratios must be in [0, 1]; got {position!r}")
61 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
62 width, height = image.size
63 new_w = int(width * self.crop_ratio)
64 new_h = int(height * self.crop_ratio)
66 slack_w = max(0, width - new_w)
67 slack_h = max(0, height - new_h)
69 if self.position == "center":
70 left = slack_w // 2
71 top = slack_h // 2
72 elif self.position == "random":
73 left = random.randint(0, slack_w) if slack_w > 0 else 0
74 top = random.randint(0, slack_h) if slack_h > 0 else 0
75 else:
76 x_ratio, y_ratio = self.position
77 left = int(round(slack_w * float(x_ratio)))
78 top = int(round(slack_h * float(y_ratio)))
80 right = left + new_w
81 bottom = top + new_h
83 return image.crop((left, top, right, bottom)).resize((width, height))
85class GaussianBlurring(ImageEditor):
86 def __init__(self, radius: int = 2):
87 super().__init__()
88 self.radius = radius
90 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
91 return image.filter(ImageFilter.GaussianBlur(self.radius))
93class GaussianNoise(ImageEditor):
94 def __init__(self, sigma: float = 25.0):
95 super().__init__()
96 self.sigma = sigma
98 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
99 img = image.convert("RGB")
100 arr = np.array(img).astype(np.float32)
102 noise = np.random.normal(0, self.sigma, arr.shape)
103 noisy_arr = np.clip(arr + noise, 0, 255).astype(np.uint8)
105 return Image.fromarray(noisy_arr)
107class Brightness(ImageEditor):
108 def __init__(self, factor: float = 1.2):
109 super().__init__()
110 self.factor = factor
112 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
113 enhancer = ImageEnhance.Brightness(image)
114 return enhancer.enhance(self.factor)
116class Mask(ImageEditor):
117 def __init__(self, mask_ratio: float = 0.1, num_masks: int = 5):
118 super().__init__()
119 self.mask_ratio = mask_ratio
120 self.num_masks = num_masks
122 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
123 img = image.copy()
124 draw = ImageDraw.Draw(img)
125 width, height = img.size
127 for _ in range(self.num_masks):
128 max_mask_width = int(width * self.mask_ratio)
129 max_mask_height = int(height * self.mask_ratio)
131 mask_width = random.randint(max_mask_width // 2, max_mask_width)
132 mask_height = random.randint(max_mask_height // 2, max_mask_height)
134 x = random.randint(0, width - mask_width)
135 y = random.randint(0, height - mask_height)
137 draw.rectangle([x, y, x + mask_width, y + mask_height], fill='black')
139 return img
141class Overlay(ImageEditor):
142 def __init__(self, num_strokes: int = 10, stroke_width: int = 5, stroke_type: str = 'random'):
143 super().__init__()
144 self.num_strokes = num_strokes
145 self.stroke_width = stroke_width
146 self.stroke_type = stroke_type
148 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
149 img = image.copy()
150 draw = ImageDraw.Draw(img)
151 width, height = img.size
153 for _ in range(self.num_strokes):
154 start_x = random.randint(0, width)
155 start_y = random.randint(0, height)
156 num_points = random.randint(3, 8)
157 points = [(start_x, start_y)]
159 for i in range(num_points - 1):
160 last_x, last_y = points[-1]
161 max_step = min(width, height) // 4
162 new_x = max(0, min(width, last_x + random.randint(-max_step, max_step)))
163 new_y = max(0, min(height, last_y + random.randint(-max_step, max_step)))
164 points.append((new_x, new_y))
166 if self.stroke_type == 'random':
167 color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
168 elif self.stroke_type == 'black':
169 color = (0, 0, 0)
170 elif self.stroke_type == 'white':
171 color = (255, 255, 255)
172 else:
173 color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
175 draw.line(points, fill=color, width=self.stroke_width)
177 return img
179class AdaptiveNoiseInjection(ImageEditor):
180 def __init__(self, intensity: float = 0.5, auto_select: bool = True):
181 super().__init__()
182 self.intensity = intensity
183 self.auto_select = auto_select
185 def _analyze_image_features(self, img_array):
186 if len(img_array.shape) == 3:
187 gray = np.mean(img_array, axis=2)
188 else:
189 gray = img_array
191 brightness_mean = np.mean(gray)
192 brightness_std = np.std(gray)
194 sobel_x = np.abs(np.diff(gray, axis=1, prepend=gray[:, :1]))
195 sobel_y = np.abs(np.diff(gray, axis=0, prepend=gray[:1, :]))
196 edge_density = np.mean(sobel_x + sobel_y)
198 kernel_size = 5
199 texture_complexity = 0
200 h, w = gray.shape
201 for i in range(0, h - kernel_size, kernel_size):
202 for j in range(0, w - kernel_size, kernel_size):
203 patch = gray[i:i+kernel_size, j:j+kernel_size]
204 texture_complexity += np.std(patch)
205 texture_complexity /= ((h // kernel_size) * (w // kernel_size))
207 return {
208 'brightness_mean': brightness_mean,
209 'brightness_std': brightness_std,
210 'edge_density': edge_density,
211 'texture_complexity': texture_complexity
212 }
214 def _select_noise_type(self, features):
215 brightness = features['brightness_mean']
216 edge_density = features['edge_density']
217 texture = features['texture_complexity']
219 if brightness < 80:
220 return 'gaussian'
221 elif edge_density > 30:
222 return 'salt_pepper'
223 elif texture > 20:
224 return 'speckle'
225 else:
226 return 'poisson'
228 def _add_gaussian_noise(self, img_array, sigma):
229 noise = np.random.normal(0, sigma, img_array.shape)
230 noisy = np.clip(img_array + noise, 0, 255)
231 return noisy.astype(np.uint8)
233 def _add_salt_pepper_noise(self, img_array, amount):
234 noisy = img_array.copy()
235 h, w = img_array.shape[:2]
236 num_pixels = h * w
238 num_salt = int(amount * num_pixels * 0.5)
239 salt_coords_y = np.random.randint(0, h, num_salt)
240 salt_coords_x = np.random.randint(0, w, num_salt)
241 noisy[salt_coords_y, salt_coords_x] = 255
243 num_pepper = int(amount * num_pixels * 0.5)
244 pepper_coords_y = np.random.randint(0, h, num_pepper)
245 pepper_coords_x = np.random.randint(0, w, num_pepper)
246 noisy[pepper_coords_y, pepper_coords_x] = 0
248 return np.clip(noisy, 0, 255).astype(np.uint8)
250 def _add_poisson_noise(self, img_array):
251 vals = len(np.unique(img_array))
252 vals = 2 ** np.ceil(np.log2(vals))
253 noisy = np.random.poisson(img_array * vals) / float(vals)
254 return np.clip(noisy, 0, 255).astype(np.uint8)
256 def _add_speckle_noise(self, img_array, variance):
257 noise = np.random.randn(*img_array.shape) * variance
258 noisy = img_array + img_array * noise
259 return np.clip(noisy, 0, 255).astype(np.uint8)
261 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
262 img = image.convert("RGB")
263 img_array = np.array(img).astype(np.float32)
265 features = self._analyze_image_features(img_array)
267 if self.auto_select:
268 noise_type = self._select_noise_type(features)
270 if noise_type == 'gaussian':
271 sigma = 40 * self.intensity
272 noisy_array = self._add_gaussian_noise(img_array, sigma)
273 elif noise_type == 'salt_pepper':
274 amount = 0.15 * self.intensity
275 noisy_array = self._add_salt_pepper_noise(img_array, amount)
276 elif noise_type == 'poisson':
277 noisy_array = self._add_poisson_noise(img_array)
278 blend_factor = min(0.8, self.intensity * 1.5)
279 noisy_array = np.clip(
280 img_array * (1 - blend_factor) + noisy_array * blend_factor,
281 0, 255
282 ).astype(np.uint8)
283 else:
284 variance = 0.5 * self.intensity
285 noisy_array = self._add_speckle_noise(img_array, variance)
286 else:
287 weight = 0.25
288 noisy_array = img_array.copy()
290 gaussian = self._add_gaussian_noise(img_array, 30 * self.intensity)
291 noisy_array = noisy_array * (1 - weight) + gaussian * weight
293 salt_pepper = self._add_salt_pepper_noise(img_array, 0.08 * self.intensity)
294 noisy_array = noisy_array * (1 - weight) + salt_pepper * weight
296 poisson = self._add_poisson_noise(img_array)
297 noisy_array = noisy_array * (1 - weight) + poisson * weight
299 speckle = self._add_speckle_noise(img_array, 0.4 * self.intensity)
300 noisy_array = noisy_array * (1 - weight) + speckle * weight
302 noisy_array = np.clip(noisy_array, 0, 255).astype(np.uint8)
304 return Image.fromarray(noisy_array)
307class DiffusionPurification(ImageEditor):
308 """Diffusion-based purification (regeneration) attack.
310 Encodes the input image to latent space, injects Gaussian noise corresponding to
311 a fraction of the diffusion schedule, then runs reverse denoising to obtain a
312 regenerated image. Generative-watermark-friendly attack: the watermark survives
313 only if it is robust to a partial round-trip through a diffusion model.
314 Reference: Nie et al., "Diffusion Models for Adversarial Purification", ICML 2022.
316 Args:
317 diffusion_config: A `DiffusionConfig` providing `pipe`, `device`, and
318 `num_inference_steps`. By default the purifier reuses
319 `diffusion_config.pipe` (a `StableDiffusionPipeline`-like object).
320 purification_strength: Fraction in (0, 1] of the diffusion schedule to use.
321 Larger values inject more noise (stronger attack, lower fidelity).
322 prompt: Optional text prompt for classifier-free guidance during denoising.
323 Empty string by default (unconditional regeneration).
324 purifier_pipe: Optional override for the pipeline used to purify; useful
325 when the user wants the purifier to be a different model from the one
326 that produced the watermarked image.
327 """
329 def __init__(self, diffusion_config, purification_strength: float = 0.3,
330 prompt: str = "", purifier_pipe=None):
331 super().__init__()
332 if not (0.0 < float(purification_strength) <= 1.0):
333 raise ValueError(
334 f"purification_strength must be in (0, 1]; got {purification_strength!r}"
335 )
336 self.diffusion_config = diffusion_config
337 self.purification_strength = float(purification_strength)
338 self.default_prompt = prompt
339 self.pipe = purifier_pipe if purifier_pipe is not None else diffusion_config.pipe
341 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
342 import torch
343 from markdiffusion.utils.media_utils import transform_to_model_format
345 prompt = prompt if prompt is not None else self.default_prompt
346 device = self.diffusion_config.device
347 target_size = self.diffusion_config.image_size[0]
349 # 1. Image -> tensor in [-1, 1], shape [1, 3, H, W]
350 image_tensor = transform_to_model_format(image, target_size=target_size).unsqueeze(0).to(device)
352 # 2. Encode prompt (classifier-free guidance disabled by default for purification)
353 with torch.no_grad():
354 prompt_embeds, _ = self.pipe.encode_prompt(
355 prompt=prompt,
356 device=device,
357 do_classifier_free_guidance=False,
358 num_images_per_prompt=1,
359 )
360 image_tensor = image_tensor.to(prompt_embeds.dtype)
362 # 3. Encode image to latent (matches utils.media_utils scaling factor)
363 with torch.no_grad():
364 latent = self.pipe.vae.encode(image_tensor).latent_dist.sample() * 0.18215
366 # 4. Pick the timestep range and add noise
367 scheduler = self.pipe.scheduler
368 num_steps = self.diffusion_config.num_inference_steps
369 scheduler.set_timesteps(num_steps, device=device)
370 n_denoise = max(1, int(round(num_steps * self.purification_strength)))
371 timesteps_to_use = scheduler.timesteps[-n_denoise:]
372 t_start = timesteps_to_use[0]
374 noise = torch.randn_like(latent)
375 noisy_latent = scheduler.add_noise(latent, noise, t_start.unsqueeze(0))
377 # 5. Reverse denoising loop
378 x = noisy_latent
379 with torch.no_grad():
380 for t in timesteps_to_use:
381 noise_pred = self.pipe.unet(x, t, encoder_hidden_states=prompt_embeds).sample
382 x = scheduler.step(noise_pred, t, x).prev_sample
384 # 6. Decode back to image, then to PIL
385 with torch.no_grad():
386 decoded = self.pipe.vae.decode(x / 0.18215, return_dict=False)[0]
387 decoded = (decoded / 2 + 0.5).clamp(0, 1)
388 arr = (decoded[0].cpu().float().permute(1, 2, 0).numpy() * 255.0).round().astype(np.uint8)
390 out = Image.fromarray(arr)
391 if out.size != image.size:
392 out = out.resize(image.size)
393 return out
396class NeuralCodecCompression(ImageEditor):
397 """Learned-image-codec compression (regeneration) attack.
399 Re-encodes the image through a pretrained neural compression model, simulating
400 the kind of distortion a downstream encoder would introduce at a target bitrate.
401 Backed by `compressai` (an [optional] dependency).
402 Reference: Cheng et al., "Learned Image Compression with Discretized Gaussian
403 Mixture Likelihoods and Attention Modules", CVPR 2020.
405 Args:
406 quality: Quality level passed to compressai's pretrained zoo
407 (typically 1-8 for cheng2020-anchor; higher = better fidelity, more bits).
408 model_name: Any key from `compressai.zoo.image_models`. Defaults to
409 'cheng2020-anchor'. Other useful choices: 'bmshj2018-factorized',
410 'bmshj2018-hyperprior', 'mbt2018', 'cheng2020-attn'.
411 device: Override the device for the codec; defaults to CUDA if available else CPU.
412 """
414 _MODEL_CACHE = {} # (model_name, quality, device) -> nn.Module
416 def __init__(self, quality: int = 5, model_name: str = "cheng2020-anchor",
417 device: str = None):
418 super().__init__()
419 self.quality = int(quality)
420 self.model_name = model_name
421 self.device = device
423 def _get_model(self):
424 import torch
425 try:
426 from compressai.zoo import image_models
427 except ImportError as e:
428 raise ImportError(
429 "NeuralCodecCompression requires `compressai`. "
430 "Install with: pip install -e '.[optional]' (or `pip install compressai`)."
431 ) from e
432 if self.model_name not in image_models:
433 raise ValueError(
434 f"Unknown compressai model {self.model_name!r}; "
435 f"available: {sorted(image_models)}"
436 )
437 device = self.device or ("cuda" if torch.cuda.is_available() else "cpu")
438 cache_key = (self.model_name, self.quality, device)
439 net = self._MODEL_CACHE.get(cache_key)
440 if net is None:
441 net = image_models[self.model_name](quality=self.quality, pretrained=True)
442 net = net.eval().to(device)
443 self._MODEL_CACHE[cache_key] = net
444 return net, device
446 def edit(self, image: Image.Image, prompt: str = None) -> Image.Image:
447 import torch
448 from torchvision import transforms
450 net, device = self._get_model()
451 original_size = image.size
453 # compressai models expect input dimensions divisible by 64.
454 w, h = original_size
455 new_w = max(64, (w // 64) * 64)
456 new_h = max(64, (h // 64) * 64)
457 if (new_w, new_h) != (w, h):
458 input_image = image.resize((new_w, new_h))
459 else:
460 input_image = image
462 x = transforms.ToTensor()(input_image.convert("RGB")).unsqueeze(0).to(device)
463 with torch.no_grad():
464 out = net(x)
465 x_hat = out["x_hat"].clamp(0, 1)
467 out_pil = transforms.ToPILImage()(x_hat[0].cpu())
468 if out_pil.size != original_size:
469 out_pil = out_pil.resize(original_size)
470 return out_pil