import subprocess #to run FFMPEG commands
import re #regex to read FFMPEG outputs
import cv2 #displacement calculations
import numpy as np #idk bruh ask chatgpt why i need this
import os #file operations
import shutil #directory operations
import argparse #cli bullshit
from fractions import Fraction
import json
TMP_DIR = 'vidTemp'
def run(video, image2, frames=2, type='displacement', insertion='max', threshold=0.4, output=''):
#image2 must exist
assert os.path.exists(image2), "make sure to specify your displacement map"
#make sure TMP_DIR doesn't exist:
try:
os.makedirs(TMP_DIR)
except:
raise TMPEXISTS(f'{TMP_DIR} already exists, please remove it or change TMP_DIR')
#calculate output first
if output == '':
output = video
else:
assert os.path.splitext(video)[1] == os.path.splitext(output)[1] , "file extensions should be the same dumbass"
#run FFMPEG to get scene changes
scenes = subprocess.run(
['ffmpeg', '-i', video,
'-filter:v', f"select='gt(scene,{threshold})',metadata=print",
'-f', 'null', '-'],
text=True,
capture_output=True
)
#this part is 100% written by chatgpt but im pretty sure all it's doing is using regex to get the scores so its just menial work
lines = scenes.stderr.split('\n')
times_scores = []
score = None
for line in lines:
score_match = re.search(r'scene_score=([\d\.eE+-]+)', line)
time_match = re.search(r'pts_time:([\d\.eE+-]+)', line)
if score_match:
score = float(score_match.group(1))
if time_match and score is not None:
time = float(time_match.group(1))
times_scores.append((time, score))
score = None # reset for next pair
if not times_scores:
raise Exception("No scene changes detected. Try lowering your threshold")
#choose type
insertion = insertion.lower()
if insertion == 'max':
frameToInsert = max(times_scores, key=lambda x: x[1])
elif insertion == 'random':
frameToInsert = times_scores[np.random.randint(0, len(times_scores))] #importing random makes it too clustered
else:
print('Not a valid insertion method, defaulting to max')
frameToInsert = max(times_scores, key=lambda x: x[1])
if type == 'insertion':
insert(video, image2, frameToInsert[0], frames, output)
elif type == 'overwrite':
overwrite(video, image2, frameToInsert[0], frames, output)
else:
print('Not a valid type, defaulting to insert')
insert(video, image2, frameToInsert[0], frames, output)
#delete the tmp dir
try:
shutil.rmtree(TMP_DIR)
print(f'cleaned up {TMP_DIR}')
except:
print(f'failed to clean up {TMP_DIR}, you might want to delete it manually')
class TMPEXISTS(Exception):
def __init__(self, message="Temporary directory already exists"):
super().__init__(message)
#100% GPT SLOP, this time I have no idea what it's doing, but i assume it's correct.
def apply_displacement(frame, displacement_map):
"""apply displacement to a frame using the displacement map"""
h, w = frame.shape[:2]
disp = cv2.imread(displacement_map, cv2.IMREAD_GRAYSCALE)
disp = cv2.resize(disp, (w, h))
#normalize displacement to [-1, 1] range
disp = (disp.astype(np.float32) - 127.5) / 127.5
#create coordinate grids
y, x = np.mgrid[0:h, 0:w].astype(np.float32)
#apply displacement (scale it to reasonable pixel displacement)
displacement_strength = 20 #adjust this if you want more/less displacement
x_displaced = x + disp * displacement_strength
y_displaced = y + disp * displacement_strength
#remap the frame
displaced_frame = cv2.remap(frame, x_displaced, y_displaced, cv2.INTER_LINEAR)
return displaced_frame
#GPT SLOP
def get_video_props(video):
"""Get video properties including SAR using ffprobe with JSON output."""
result = subprocess.run([
'ffprobe', '-v', 'quiet',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height,r_frame_rate,pix_fmt,sample_aspect_ratio',
'-of', 'json', video
], capture_output=True, text=True)
data = json.loads(result.stdout)
stream = data['streams'][0]
width = int(stream['width'])
height = int(stream['height'])
pix_fmt = stream.get('pix_fmt', 'yuv420p')
fps_str = stream.get('r_frame_rate', '30/1')
fps = float(Fraction(fps_str)) if fps_str else 30.0
sar = stream.get('sample_aspect_ratio', '1:1') # Default to 1:1 if not specified
return width, height, fps, pix_fmt, sar
#gpt slop
def insert(video, displacement_map, insert_time, frames, output):
"""insert displaced frames before the specified time - quick flash"""
width, height, fps, pix_fmt, sar = get_video_props(video)
# Extract frame at insert_time with original resolution and SAR
frame_path = os.path.join(TMP_DIR, 'extract_frame.png')
subprocess.run([
'ffmpeg', '-i', video, '-ss', str(insert_time),
'-vframes', '1', '-vf', f'setsar={sar}',
frame_path, '-y'
], stderr=subprocess.DEVNULL)
# Process displaced frames (same as before)
frame = cv2.imread(frame_path)
displaced_frame = apply_displacement(frame, displacement_map)
for i in range(frames):
displaced_path = os.path.join(TMP_DIR, f'displaced_{i:04d}.png')
cv2.imwrite(displaced_path, displaced_frame)
# Create flash video with original SAR
flash_duration = frames / fps
displaced_vid = os.path.join(TMP_DIR, 'flash.mp4')
subprocess.run([
'ffmpeg', '-framerate', str(fps),
'-i', os.path.join(TMP_DIR, 'displaced_%04d.png'),
'-c:v', 'libx264', '-pix_fmt', pix_fmt,
'-vf', f'setsar={sar}',
'-t', str(flash_duration), displaced_vid, '-y'
], stderr=subprocess.DEVNULL)
# Final concatenation preserving SAR
subprocess.run([
'ffmpeg', '-i', video, '-i', displaced_vid,
'-filter_complex',
f'''
[0:v]trim=0:{insert_time},setpts=PTS-STARTPTS,setsar={sar}[v0];
[0:v]trim=start={insert_time},setpts=PTS-STARTPTS,setsar={sar}[v1];
[1:v]setpts=PTS-STARTPTS,setsar={sar}[v_insert];
[v0][v_insert][v1]concat=n=3:v=1:a=0[outv]
''',
'-map', '[outv]', '-map', '0:a',
'-c:v', 'libx264', '-c:a', 'copy',
'-pix_fmt', pix_fmt,
output, '-y'
])
#gpt slop
def overwrite(video, displacement_map, start_time, frames, output):
"""1-to-1 replace x frames with displaced versions"""
width, height, fps, pix_fmt, sar = get_video_props(video)
# Extract and process frames with original SAR
start_frame = int(start_time * fps)
frame_dir = os.path.join(TMP_DIR, 'frames')
os.makedirs(frame_dir)
for i in range(frames):
frame_num = start_frame + i
frame_time = frame_num / fps
frame_path = os.path.join(frame_dir, f'frame_{i:04d}.png')
subprocess.run([
'ffmpeg', '-i', video, '-ss', str(frame_time),
'-vframes', '1', '-vf', f'setsar={sar}',
frame_path, '-y'
])
frame = cv2.imread(frame_path)
displaced_frame = apply_displacement(frame, displacement_map)
cv2.imwrite(frame_path, displaced_frame)
# Create replacement segment with original SAR
frame_duration = frames / fps
displaced_vid = os.path.join(TMP_DIR, 'replaced_frames.mp4')
subprocess.run([
'ffmpeg', '-framerate', str(fps),
'-i', os.path.join(frame_dir, 'frame_%04d.png'),
'-c:v', 'libx264', '-pix_fmt', pix_fmt,
'-vf', f'setsar={sar}',
'-t', str(frame_duration), displaced_vid, '-y'
])
# Final concatenation preserving SAR
end_time = start_time + frame_duration
subprocess.run([
'ffmpeg', '-i', video, '-i', displaced_vid,
'-filter_complex',
f'''
[0:v]trim=0:{start_time},setpts=PTS-STARTPTS,setsar={sar}[v1];
[0:v]trim=start={end_time},setpts=PTS-STARTPTS,setsar={sar}[v2];
[1:v]setpts=PTS-STARTPTS,setsar={sar}[vdis];
[v1][vdis][v2]concat=n=3:v=1:a=0[outv]
''',
'-map', '[outv]', '-map', '0:a',
'-c:v', 'libx264', '-c:a', 'copy',
'-pix_fmt', pix_fmt,
output, '-y'
])
#CLI
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Sneaky Watermark script')
parser.add_argument('video', help='input video file')
parser.add_argument('displacement_map', help='displacement map image')
parser.add_argument('-o', '--output', default='', help='output file (default: overwrites video)')
parser.add_argument('-f', '--frames', type=int, default=2, help='number of frames (default: 2)')
parser.add_argument('-t', '--type', choices=['insertion', 'overwrite'], default='insertion', help='operation type (default: insertion)')
parser.add_argument('-i', '--insertion', choices=['max', 'random'], default='max', help='frame selection method (default: max)')
parser.add_argument('--threshold', type=float, default=0, help='scene change threshold (only useful for random)')
args = parser.parse_args()
try:
run(args.video, args.displacement_map, args.frames, args.type, args.insertion, args.threshold, args.output)
print(f'Done!')
except Exception as e:
if type(e) != TMPEXISTS:
try:
shutil.rmtree(TMP_DIR)
print(f'cleaned up {TMP_DIR}')
except:
print(f'failed to clean up {TMP_DIR}, you might want to delete it manually')
print(f'something went wrong: {e}')