Python
Image sequence scanner
Vibe coded with Gemini. No human coding necessary. Will look in a config.txt file (which will be created if it doesn't exist) for a folder and scan and return a json file of the results. Loop until it is told to stop. This is to be used for an image sequence imported in different software (most notably AE).
import os
import json
import re
import time
from datetime import datetime
class ConfigFileNotFoundError(FileNotFoundError):
"""Custom exception raised when the configuration file is missing."""
pass
class ImageSequenceScanner:
"""
A tool to scan a directory recursively for image sequences.
It now includes a filter to skip sequences shorter than a minimum length.
"""
# Constants for config file entries
CONFIG_ROOT_DIR = "root_dir"
CONFIG_FREQUENCY = "frequency"
CONFIG_SHUTDOWN = "shutdown"
CONFIG_LAST_SCAN = "last_scan"
CONFIG_MIN_LENGTH = "min_sequence_length"
def __init__(self, json_path, config_path, default_root_dir):
"""
Initialize the scanner. Reads the root directory and frequency from config.
"""
self.json_path = json_path
self.config_path = config_path
self.default_root_dir = default_root_dir
# 🚨 Immediate Config File Check
if not os.path.exists(self.config_path):
raise ConfigFileNotFoundError(
f"\n--- ERROR: Configuration file not found at '{self.config_path}' ---\n"
"The script cannot start without a configuration file."
)
# Read initial settings
config = self._read_config()
self.root_dir = config.get(self.CONFIG_ROOT_DIR, self.default_root_dir)
self.current_scan_frequency = config.get(self.CONFIG_FREQUENCY, 60)
self.priority_folder = config.get("priority_folder")
self.should_shutdown = config.get(self.CONFIG_SHUTDOWN, False)
# New: Minimum sequence length, default is 3
self.min_sequence_length = config.get(self.CONFIG_MIN_LENGTH, 3)
self.sequence_data = self._load_json()
self.last_scan_times = {} # For aggressive scanning
print(f"Root Scan Directory: {self.root_dir}")
print(f"Initial Scan Frequency: {self.current_scan_frequency} seconds")
print(f"Minimum Sequence Length: {self.min_sequence_length} images")
# --- Configuration and Data Management ---
def _load_json(self):
"""Load existing sequence data from the JSON file."""
if os.path.exists(self.json_path):
try:
with open(self.json_path, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
print("Warning: JSON file is corrupted or empty. Starting with empty data.")
return {}
return {}
def _save_json(self):
"""Save the updated sequence data to the JSON file."""
with open(self.json_path, 'w') as f:
json.dump(self.sequence_data, f, indent=4)
print(f"[{datetime.now().strftime('%H:%M:%S')}] JSON updated: {self.json_path}")
def _normalize_path(self, path):
"""Standardizes a path by expanding user and making it absolute."""
return os.path.abspath(os.path.expanduser(path))
def _read_config(self):
"""Reads config file, supporting all settings."""
config = {
self.CONFIG_FREQUENCY: 60,
self.CONFIG_ROOT_DIR: self.default_root_dir,
"priority_folder": None,
self.CONFIG_SHUTDOWN: False,
self.CONFIG_LAST_SCAN: "Never",
self.CONFIG_MIN_LENGTH: 3 # Default minimum length
}
try:
with open(self.config_path, 'r') as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == self.CONFIG_FREQUENCY:
match_s = re.match(r'(\d+)s$', value.lower())
match_m = re.match(r'(\d+)m(in)?$', value.lower())
if match_s: config[self.CONFIG_FREQUENCY] = max(1, int(match_s.group(1)))
elif match_m: config[self.CONFIG_FREQUENCY] = max(60, int(match_m.group(1)) * 60)
elif key == self.CONFIG_ROOT_DIR:
potential_path = self._normalize_path(value)
if os.path.isdir(potential_path):
config[self.CONFIG_ROOT_DIR] = potential_path
elif key == self.CONFIG_SHUTDOWN:
if value.lower() == "true":
config[self.CONFIG_SHUTDOWN] = True
elif key == self.CONFIG_LAST_SCAN:
config[self.CONFIG_LAST_SCAN] = value
elif key == self.CONFIG_MIN_LENGTH:
try:
config[self.CONFIG_MIN_LENGTH] = max(1, int(value)) # Minimum 1
except ValueError:
pass
else: # Check for standalone priority folder path
potential_path = self._normalize_path(line)
if os.path.isdir(potential_path):
config["priority_folder"] = potential_path
elif os.path.isdir(os.path.join(self.root_dir, line)):
config["priority_folder"] = os.path.join(self.root_dir, line)
return config
except Exception as e:
print(f"Error reading config file contents: {e}. Using default settings.")
return config
def _write_config_updates(self, priority_cleared=False):
"""Rewrites the config file to update last_scan time and clear priority/shutdown flags."""
new_last_scan_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
new_lines = []
with open(self.config_path, 'r') as f:
lines = f.readlines()
has_root = False
has_freq = False
has_last_scan = False
has_shutdown = False
has_min_length = False
normalized_priority = self._normalize_path(self.priority_folder) if self.priority_folder else None
for line in lines:
stripped_line = line.strip()
key = stripped_line.split('=', 1)[0].strip().lower() if '=' in stripped_line else None
# Skip the priority folder line if it was just scanned
if priority_cleared:
if normalized_priority and (self._normalize_path(stripped_line) == normalized_priority or \
os.path.isdir(os.path.join(self.root_dir, stripped_line)) and self._normalize_path(os.path.join(self.root_dir, stripped_line)) == normalized_priority):
continue
# Update lines with dynamic values
if key == self.CONFIG_LAST_SCAN:
new_lines.append(f"{self.CONFIG_LAST_SCAN}={new_last_scan_time}\n")
has_last_scan = True
elif key == self.CONFIG_SHUTDOWN:
new_lines.append(f"{self.CONFIG_SHUTDOWN}=false\n")
has_shutdown = True
elif key == self.CONFIG_ROOT_DIR:
new_lines.append(f"{self.CONFIG_ROOT_DIR}={self.root_dir}\n")
has_root = True
elif key == self.CONFIG_FREQUENCY:
new_lines.append(f"{self.CONFIG_FREQUENCY}={self.current_scan_frequency}s\n")
has_freq = True
elif key == self.CONFIG_MIN_LENGTH:
new_lines.append(f"{self.CONFIG_MIN_LENGTH}={self.min_sequence_length}\n")
has_min_length = True
else:
new_lines.append(line)
# Ensure mandatory lines are present if they were missing
if not has_freq: new_lines.append(f"{self.CONFIG_FREQUENCY}={self.current_scan_frequency}s\n")
if not has_root: new_lines.append(f"{self.CONFIG_ROOT_DIR}={self.root_dir}\n")
if not has_last_scan: new_lines.append(f"{self.CONFIG_LAST_SCAN}={new_last_scan_time}\n")
if not has_shutdown: new_lines.append(f"{self.CONFIG_SHUTDOWN}=false\n")
if not has_min_length: new_lines.append(f"{self.CONFIG_MIN_LENGTH}={self.min_sequence_length}\n")
with open(self.config_path, 'w') as f:
f.writelines(new_lines)
self.priority_folder = None
self.should_shutdown = False
except Exception as e:
print(f"Error updating config file: {e}")
# --- Scanning Logic ---
def _is_aggressively_scan(self, dirpath):
"""Checks modification time for aggressive scanning."""
try:
current_mtime = os.path.getmtime(dirpath)
except OSError:
return False
last_mtime = self.last_scan_times.get(dirpath, 0)
modified = current_mtime > last_mtime
self.last_scan_times[dirpath] = current_mtime
return modified
def _detect_gaps(self, frames):
"""Analyzes a sorted list of frame numbers and identifies missing ranges (gaps)."""
if not frames: return []
missing_ranges = []
expected_frame = frames[0]
for frame in frames:
if frame > expected_frame:
gap_start = expected_frame
gap_end = frame - 1
missing_ranges.append({
"start": gap_start,
"end": gap_end,
"count": gap_end - gap_start + 1
})
expected_frame = frame + 1
return missing_ranges
def scan_directory(self, target_dir):
"""Scans a specific directory and its subfolders, including length filtering."""
print(f"Scanning target: {target_dir}")
data_updated = False
SEQUENCE_RE = re.compile(r'(.+)\.(\d+)\.(.+)$')
if not os.path.isdir(target_dir):
print(f"Error: Target directory not found: {target_dir}")
return
sequences_found_in_scan = set()
for dirpath, dirnames, filenames in os.walk(target_dir):
if not filenames:
continue
full_path_key = os.path.abspath(dirpath)
aggressive_scan = self._is_aggressively_scan(dirpath)
# --- Optimization: Skip file processing if no changes ---
if not aggressive_scan and full_path_key in self.last_scan_times:
for seq_id, seq_info in self.sequence_data.items():
if os.path.abspath(seq_info.get('path')) == full_path_key and seq_info.get('deleted') is not True:
sequences_found_in_scan.add(seq_id)
continue
sequences = {}
for filename in filenames:
match = SEQUENCE_RE.match(filename)
if match:
base_name_prefix = match.group(1)
try:
frame_number = int(match.group(2))
except ValueError:
continue
extension = match.group(3)
padding = len(match.group(2))
sequence_key = f"{base_name_prefix}{'#' * padding}.{extension}"
full_file_path = os.path.join(dirpath, filename)
if sequence_key not in sequences:
sequences[sequence_key] = []
sequences[sequence_key].append((frame_number, full_file_path))
# --- Step 2: Determine sequence info, timestamps, and gaps ---
for seq_key, frame_list in sequences.items():
# 🚨 NEW: Filter out sequences shorter than the minimum length
if len(frame_list) < self.min_sequence_length:
print(f"Skipping sequence '{seq_key}' (Count: {len(frame_list)}, Min: {self.min_sequence_length})")
continue
frame_list.sort(key=lambda x: x[0])
frames = [f[0] for f in frame_list]
start_frame = frames[0]
end_frame = frames[-1]
# Gap Detection
missing_frames = self._detect_gaps(frames)
total_expected_frames = end_frame - start_frame + 1
# Last modified timestamp
last_frame_path = frame_list[-1][1]
try:
last_mtime_float = os.path.getmtime(last_frame_path)
last_mtime_str = datetime.fromtimestamp(last_mtime_float).strftime("%Y-%m-%d %H:%M:%S")
except OSError:
last_mtime_str = "N/A"
full_sequence_id = os.path.join(full_path_key, seq_key)
sequences_found_in_scan.add(full_sequence_id)
is_new = full_sequence_id not in self.sequence_data
is_changed = not is_new and (
self.sequence_data[full_sequence_id].get('start') != start_frame or
self.sequence_data[full_sequence_id].get('end') != end_frame or
self.sequence_data[full_sequence_id].get('deleted') is True or
len(self.sequence_data[full_sequence_id].get('missing_frames', [])) != len(missing_frames)
)
if is_new or is_changed:
new_info = {
"path": full_path_key,
"name": seq_key,
"start": start_frame,
"end": end_frame,
"count": len(frames),
"total_expected_frames": total_expected_frames,
"missing_frames_count": len(missing_frames),
"missing_frames": missing_frames,
"last_modified_timestamp": last_mtime_str,
"deleted": False,
"deletion_timestamp": None
}
self.sequence_data[full_sequence_id] = new_info
data_updated = True
# --- Step 3: Deletion Detection (Tombstone Record) ---
keys_to_delete_from_data = [] # Track items that were previously marked as sequences but now fail the length check
for seq_id, seq_info in self.sequence_data.items():
if seq_id.startswith(self.root_dir):
if seq_info.get('deleted') is not True:
if seq_id not in sequences_found_in_scan:
# Sequence was found in the data, but is missing on disk or failed the length check in Step 2.
# We must check if the sequence was truly deleted or just filtered out.
# For simplicity and safety in a background script, we assume if it's missing from sequences_found_in_scan,
# it's either deleted OR too short to qualify now. We mark it deleted/obsolete anyway.
self.sequence_data[seq_id]['deleted'] = True
self.sequence_data[seq_id]['deletion_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Clear run-time data for deleted entry
self.sequence_data[seq_id]['count'] = 0
self.sequence_data[seq_id]['total_expected_frames'] = 0
self.sequence_data[seq_id]['missing_frames_count'] = 0
self.sequence_data[seq_id]['missing_frames'] = []
data_updated = True
print(f"Detected deletion/obsolescence: {seq_info['name']}")
# --- Final Save ---
if data_updated:
self._save_json()
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Scan of {os.path.basename(target_dir)} finished. No changes detected.")
def start_background_scan(self):
"""The main loop for continuous background operation."""
print(f"--- Starting Image Sequence Scanner ---")
while True:
try:
# 1. Check for configuration updates
config = self._read_config()
self.current_scan_frequency = config.get(self.CONFIG_FREQUENCY, 60)
self.priority_folder = config.get("priority_folder")
self.should_shutdown = config.get(self.CONFIG_SHUTDOWN, False)
self.min_sequence_length = config.get(self.CONFIG_MIN_LENGTH, 3)
new_root = config.get(self.CONFIG_ROOT_DIR, self.default_root_dir)
if new_root != self.root_dir:
print(f"Root directory changed from {self.root_dir} to {new_root}.")
self.root_dir = new_root
# 2. Check for shutdown command
if self.should_shutdown:
print("\n*** SHUTDOWN COMMAND DETECTED IN CONFIG. Exiting. ***")
self._write_config_updates()
break
priority_scan_executed = False
# 3. Check and perform PRIORITY scan
if self.priority_folder and os.path.isdir(self.priority_folder):
self.scan_directory(self.priority_folder)
priority_scan_executed = True
# 4. Perform REGULAR scan
if os.path.isdir(self.root_dir):
self.scan_directory(self.root_dir)
else:
print(f"Warning: Main root directory not found: {self.root_dir}")
# 5. Update config file (last scan time and clear flags)
self._write_config_updates(priority_cleared=priority_scan_executed)
# 6. Pause for the configured time
time.sleep(self.current_scan_frequency)
except KeyboardInterrupt:
print("\n--- Scanner stopped by user. ---")
break
except Exception as e:
print(f"An error occurred during the loop: {e}. Waiting 10s before retry.")
time.sleep(10)
# --- Configuration and Execution ---
if __name__ == '__main__':
# Determine the directory where this script is located
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# Define paths relative to the script's directory
DEFAULT_ROOT_DIR = os.path.join(SCRIPT_DIR, 'data_to_scan')
OUTPUT_JSON_PATH = os.path.join(SCRIPT_DIR, 'sequences.json')
CONFIG_FILE_PATH = os.path.join(SCRIPT_DIR, 'config.txt')
# Setup Environment
os.makedirs(DEFAULT_ROOT_DIR, exist_ok=True)
# Example: Create the config file if it doesn't exist (to make the script runnable out-of-the-box)
if not os.path.exists(CONFIG_FILE_PATH):
print(f"Creating default config file: {CONFIG_FILE_PATH}")
with open(CONFIG_FILE_PATH, 'w') as f:
f.write("# Configuration for Image Sequence Scanner\n")
f.write(f"root_dir={DEFAULT_ROOT_DIR}\n")
f.write("frequency=10s\n")
f.write("shutdown=false\n")
f.write("last_scan=Never\n")
f.write("min_sequence_length=3\n") # New default setting
# Instantiate and Start the Scanner
try:
scanner = ImageSequenceScanner(OUTPUT_JSON_PATH, CONFIG_FILE_PATH, DEFAULT_ROOT_DIR)
scanner.start_background_scan()
except ConfigFileNotFoundError as e:
print(e)
print("\nACTION REQUIRED: Please create the configuration file and restart the scanner.")
except Exception as e:
print(f"An unhandled critical error occurred: {e}")
AI Generated: YT downloader + Audio+Video Merger with FFMPEG
Scary how well this works.
import yt_dlp
import os
def download_highest_resolution_mp4(channel_url: str, output_path: str = './downloads'):
"""
Downloads the 720p resolution MP4 video from a YouTube live channel.
Ensures each video is downloaded only once (yt-dlp handles skipping existing files).
Args:
channel_url (str): The URL of the YouTube live channel.
output_path (str): The directory where the video will be saved.
Defaults to './downloads'.
"""
# Ensure the output directory exists
if not os.path.exists(output_path):
os.makedirs(output_path)
print(f"Created output directory: {output_path}")
# yt-dlp options
ydl_opts = {
# Prioritize MP4, specifically targeting 720p height
'format': 'bestvideo[ext=mp4][height=720]+bestaudio[ext=m4a]/best[ext=mp4][height=720]/best[height=720]',
'outtmpl': os.path.join(output_path, '%(title)s.%(ext)s'), # Output template for filename
'merge_output_format': 'mp4', # Ensure final merged file is MP4
'noplaylist': True, # Do not download entire playlist if it's a channel URL
'restrictfilenames': True, # Keep filenames simple
'progress_hooks': [lambda d: print(f"Downloading: {d['filename']} - {d['_percent_str']} of {d['_total_bytes_str'] or d['_total_bytes_estimate_str'] or 'unknown size'} at {d['_speed_str'] or 'unknown speed'}") if d['status'] == 'downloading' else None],
'verbose': False, # Set to True for more detailed output from yt-dlp
'quiet': False, # Set to True to suppress most output
'ignoreerrors': True, # Ignore errors for individual videos (e.g., if a specific format isn't available)
'download_archive': os.path.join(output_path, 'downloaded_videos.txt'), # Keep track of downloaded videos
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
print(f"\nAttempting to download from channel: {channel_url}")
# Extract information and download the latest video from the channel
# yt-dlp automatically handles finding the latest video for a channel URL
info_dict = ydl.extract_info(channel_url, download=True)
if info_dict:
# If it's a playlist/channel, info_dict might contain 'entries'
# We are interested in the single video downloaded or the latest one
if 'entries' in info_dict and info_dict['entries']:
# Assuming the first entry is the one downloaded or the most relevant
downloaded_title = info_dict['entries'][0].get('title', 'N/A')
downloaded_ext = info_dict['entries'][0].get('ext', 'mp4')
print(f"\nSuccessfully downloaded: {downloaded_title}.{downloaded_ext}")
else:
downloaded_title = info_dict.get('title', 'N/A')
downloaded_ext = info_dict.get('ext', 'mp4')
print(f"\nSuccessfully downloaded: {downloaded_title}.{downloaded_ext}")
else:
print("No video information found or downloaded.")
except Exception as e:
print(f"\nAn error occurred: {e}")
print("Please ensure the channel URL is correct and accessible.")
print("Also, check your internet connection and yt-dlp installation.")
if __name__ == "__main__":
# --- Configuration ---
# Replace with the actual YouTube live channel URL
# Example: 'https://www.youtube.com/@NASA' or 'https://www.youtube.com/channel/UCk8GzjMWMl_J_8P_J3P_J3A'
youtube_channel_url = "https://www.youtube.com/@JunichiroHorikawa/streams"
# Specify the directory where you want to save the downloaded videos
download_directory = "./youtube_downloads"
# -------------------
if "YOUR_YOUTUBE_LIVE_CHANNEL_URL_HERE" in youtube_channel_url:
print("Please replace 'YOUR_YOUTUBE_LIVE_CHANNEL_URL_HERE' with the actual YouTube live channel URL.")
else:
download_highest_resolution_mp4(youtube_channel_url, download_directory)
FFMPEG merger
import os
import subprocess
def merge_video_audio(input_directory: str, output_directory: str, ffmpeg_path: str):
"""
Merges MP4 video files (ending with f136.mp4) with M4A audio files (ending with f140.m4a)
from a specified input directory into a new output directory using FFmpeg.
Args:
input_directory (str): The directory containing the video and audio files.
output_directory (str): The directory where the merged video files will be saved.
ffmpeg_path (str): The full path to the FFmpeg executable.
"""
if not os.path.exists(input_directory):
print(f"Error: Input directory '{input_directory}' does not exist.")
return
if not os.path.exists(output_directory):
os.makedirs(output_directory)
print(f"Created output directory: {output_directory}")
# List all files in the input directory
files = os.listdir(input_directory)
# Separate video and audio files based on their suffixes
video_files = {f.replace('.f136.mp4', ''): f for f in files if f.endswith('.f136.mp4')}
audio_files = {f.replace('.f140.m4a', ''): f for f in files if f.endswith('.f140.m4a')}
merged_count = 0
skipped_count = 0
print(f"\nSearching for video and audio pairs in '{input_directory}'...")
for base_name, video_filename in video_files.items():
if base_name in audio_files:
audio_filename = audio_files[base_name]
video_path = os.path.join(input_directory, video_filename)
audio_path = os.path.join(input_directory, audio_filename)
output_filename = f"{base_name}.mp4" # Merged file will be a clean MP4
output_path = os.path.join(output_directory, output_filename)
if os.path.exists(output_path):
print(f"Skipping '{output_filename}': Already exists in output directory.")
skipped_count += 1
continue
print(f"\nMerging '{video_filename}' with '{audio_filename}' into '{output_filename}'...")
# FFmpeg command to merge video and audio without re-encoding
# -i for input file, -c:v copy for copying video stream, -c:a copy for copying audio stream
# -map 0:v:0 to select video stream from first input, -map 1:a:0 to select audio stream from second input
command = [
ffmpeg_path,
'-i', video_path,
'-i', audio_path,
'-c:v', 'copy',
'-c:a', 'copy',
'-map', '0:v:0', # Map video stream from first input
'-map', '1:a:0', # Map audio stream from second input
output_path
]
try:
# Run the FFmpeg command
# capture_output=True to suppress ffmpeg's console output in the main script's console
# text=True to decode stdout/stderr as text
process = subprocess.run(command, capture_output=True, text=True, check=True)
print(f"Successfully merged '{output_filename}'.")
merged_count += 1
except subprocess.CalledProcessError as e:
print(f"Error merging '{output_filename}':")
print(f" Command: {' '.join(e.cmd)}")
print(f" Return Code: {e.returncode}")
print(f" STDOUT: {e.stdout}")
print(f" STDERR: {e.stderr}")
except FileNotFoundError:
print(f"Error: FFmpeg not found at '{ffmpeg_path}'. Please ensure the path is correct.")
return
except Exception as e:
print(f"An unexpected error occurred during merging '{output_filename}': {e}")
else:
print(f"Warning: No matching audio file found for video '{video_filename}'. Skipping.")
print(f"\n--- Merging Summary ---")
print(f"Total files processed: {len(video_files)}")
print(f"Successfully merged: {merged_count}")
print(f"Skipped (already exists): {skipped_count}")
print(f"Files with no matching audio: {len(video_files) - (merged_count + skipped_count)}")
if __name__ == "__main__":
# --- Configuration ---
# IMPORTANT: Replace with the actual path to your FFmpeg executable.
# On Windows, it might be something like 'C:\\ffmpeg\\bin\\ffmpeg.exe'
# On macOS/Linux, it might be '/usr/local/bin/ffmpeg' or '/usr/bin/ffmpeg'
# You can download FFmpeg from https://ffmpeg.org/download.html
FFMPEG_EXECUTABLE_PATH = "C:\\Users\\mbernadat\\Downloads\\exe\\ffmpeg.exe"
# Directory where your .f136.mp4 and .f140.m4a files are located
INPUT_VIDEOS_DIR = "./youtube_downloads" # Assuming this is where the previous script downloads
# Directory where the merged .mp4 files will be saved
OUTPUT_MERGED_DIR = "./merged_videos"
# -------------------
if "YOUR_FFMPEG_PATH_HERE" in FFMPEG_EXECUTABLE_PATH:
print("Please replace 'YOUR_FFMPEG_PATH_HERE' with the actual path to your FFmpeg executable.")
print("Example for Windows: 'C:\\ffmpeg\\bin\\ffmpeg.exe'")
print("Example for macOS/Linux: '/usr/local/bin/ffmpeg'")
else:
merge_video_audio(INPUT_VIDEOS_DIR, OUTPUT_MERGED_DIR, FFMPEG_EXECUTABLE_PATH)
interencheres
import re, os, uuid, datetime, sys, signal
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
from selenium.webdriver.remote.remote_connection import LOGGER as seleniumLogger
import logging
from subprocess import CREATE_NO_WINDOW
from time import sleep
import urllib.request
from pathlib import Path
## variables
service = Service('Q:/WORK/TEMP/python/chromedriver_win32/chromedriver.exe')
service.creationflags = CREATE_NO_WINDOW
subfolder = 'encheres'
looptime = 5
## defs
def log(text,sameline=False):
if sameline:
print(text,end = '\r')
else:
print(text)
def sigint_handler(signal, frame):
log('KeyboardInterrupt is caught')
driver.quit()
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
def touch(fname, times=None):
Path(fname).mkdir(parents=True, exist_ok=True)
def textappend(file,text):
with open(file, 'a',encoding='utf-8-sig') as record:
record.write(text+'\n')
def download(url,savedir):
touch(savedir)
uid = uuid.uuid4().hex
urllib.request.urlretrieve(url, savedir+'/'+uid+'.jpg')
return uid
def out(driver):
driver.quit()
exit()
def createHtml(textfile):
htmlfile = textfile+'.html'
if '.txt' in htmlfile:
try:
os.remove(htmlfile)
except OSError:
log('cannot delete'+ htmlfile)
pass
switch = 0
colors = ['#F4F4F4','#FBFBFB']
html = '<html><body><table>'
with open(textfile, 'r',encoding='utf-8-sig') as records:
page = records.readlines()
for item in page:
switch = not switch
L = item.split(' | ')
image = '<img style="max-height:200px;max-width:200px" src="images/'+L[0]+'.jpg">'
infos = L[3]+'<br/>\n\t<br/>\n\t'+L[2]+': '+L[4]
html += '<tr style="background-color:{color}">\n\t<td align="right">{image}</td>\n\t<td valign="top" style="padding:10px"><strong>{price}</strong><br/>\n\t{infos}\t</td>\n</tr>\n'.format(image=image,price=L[1],infos=infos,color=colors[switch])
html += '</table></body></html>'
try:
textappend(htmlfile,html)
log('writing html file')
except:
log('error writing html file')
def writeOut(driver,previousprice,previousiteminfos,filepath,looptime):
outstring = uid+' | '+previousprice+' | '+' | '.join(previousiteminfos);
log(outstring)
textappend(filepath,outstring)
createHtml(filepath)
log('looping every '+str(looptime)+'s')
url = ''
try:
url = sys.argv[1]
except:
log('url needed as argument')
exit()
savepath = os.path.dirname(os.path.realpath(__file__))+'/'+subfolder
fileNameDate = datetime.datetime.now().strftime("%y%m%d_%H-%M")
filepath = savepath+'/'+fileNameDate+'_'+url.split('/')[3]+'_'+url.split('/')[4]+'.txt'
log('loading headless chrome...')
seleniumLogger.setLevel(logging.WARNING)
options = webdriver.ChromeOptions()
#options.add_experimental_option('prefs', prefs)
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
options.add_argument('--window-size=1280x1696')
options.add_argument('--user-data-dir=/tmp/user-data')
options.add_argument('--hide-scrollbars')
options.add_argument('--service-log-path='+os.devnull)
options.add_argument('--enable-logging')
options.add_argument('--log-level=3')
options.add_argument('--silent');
options.add_argument('--v=99')
options.add_argument('--single-process')
options.add_argument('--data-path=/tmp/data-path')
options.add_argument('--ignore-certificate-errors')
options.add_argument('--homedir=/tmp')
options.add_argument('--disk-cache-dir=/tmp/cache-dir')
options.add_argument('user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36')
driver = webdriver.Chrome(service=service,options=options)
isWaitingToStart = True
isOn = True
url = driver.get(url)
log('loading ' + driver.current_url) #sanity check
log('writing to '+filepath)
# figure our dom elements
# is it started or over, if not wait
while isWaitingToStart:
try:
items = driver.find_elements(By.CLASS_NAME, 'wrapper-join-sale-button')
for item in items:
if item.text == 'Cette vente est terminée':
log('over')
out(driver)
elif 'La vente commence dans' in item.text:
log('waiting to start...')
sleep(100)
else:
isWaitingToStart = False
log('starting')
break
except NoSuchElementException:
print('No wrapper-join-sale-button')
# assume it's on
try:
priceitem = driver.find_element(By.CLASS_NAME, 'text-h5')
items = driver.find_elements(By.CLASS_NAME, "text-h5")
for item in items:
if '€' in item.text:
priceitem = item
break
except NoSuchElementException:
log('No text-h5 found')
out(driver)
try:
itemdiv = driver.find_element(By.CLASS_NAME, 'current-item')
except NoSuchElementException:
log('No current-item')
out(driver)
# turn off video if it's on
try:
driver.execute_script('document.getElementById("streaming-subscriber").pause();')
log('video paused')
except:
log('No video stream found')
previousprice = 'none'
previousitem = 'none'
previousiteminfos = []
previousimage = 'none'
uid = 'none'
# loop until we break
while isOn:
# price and/or stop if finished
try:
findprice = driver.find_element(By.CLASS_NAME, 'text-h5')
if not findprice:
isOn = False
log('finished, stopping')
out(driver)
fetchprice = priceitem.text
fetchprice = "".join(fetchprice.split()) # remove weird space characters
except:
log('price error, most likely finished')
writeOut(driver,previousprice,previousiteminfos,filepath,looptime)
#createHtml(filepath)
out(driver)
# current item description and image
try:
currentiteminfos = itemdiv.text.splitlines()
currentitem = currentiteminfos[0]
except:
log('item error')
out(driver)
#fetch image
try:
rgx = r'background-image: url\("(.*)"'
images = itemdiv.find_elements(By.CLASS_NAME, 'v-image__image')
for image in images:
style = image.get_attribute('style')
result = re.search(rgx, style)
if result:
currentimage = result.groups(0)[0]
if currentimage != previousimage and previousimage != 'none':
uid = download(previousimage,savepath+'/images')
log('new image downloaded')
previousimage = currentimage
except:
log('image error')
out(driver)
# if things have changed, it means the previous item bid is over
if (currentitem != previousitem ) and previousprice != 'none':
writeOut(driver,previousprice,previousiteminfos,filepath,looptime)
#outstring = uid+' | '+previousprice+' | '+' | '.join(previousiteminfos);
#log(outstring)
#textappend(filepath,outstring)
#createHtml(filepath)
#log('looping every '+str(looptime)+'s')
try:
driver.execute_script('document.getElementById("streaming-subscriber").pause();')
log('video paused')
except:
log('no video stream found')
if previousprice != fetchprice:
log(previousprice+' ',True)
if fetchprice != '-- €':
previousprice = fetchprice
previousitem = currentitem
previousiteminfos = currentiteminfos
previousimage = currentimage
sleep(looptime)
driver.quit()
log('script finished')
import os
def touch(fname, times=None):
Path(fname).mkdir(parents=True, exist_ok=True)
def textappend(file,text):
with open(file, 'a',encoding='utf-8-sig') as record:
record.write(text+'\n')
subfolder = 'encheres'
savepath = os.path.dirname(os.path.realpath(__file__))+'/'+subfolder
txtfile = savepath+'/220217_12-11_materiels-professionnels_vehicules-et-materiel-304020.txt'
def createHtml(textfile):
htmlfile = textfile+'.html'
if '.txt' in htmlfile:
try:
os.remove(htmlfile)
except OSError:
#log('cannot delete'+ htmlfile)
pass
switch = 0
colors = ['#F4F4F4','#FBFBFB']
html = '<html><body><table>'
with open(textfile, 'r',encoding='utf-8-sig') as records:
page = records.readlines()
for item in page:
switch = not switch
L = item.split(' | ')
image = '<img style="max-height:200px;max-width:200px" src="images/'+L[0]+'.jpg">'
infos = L[3]+'<br/>\n\t<br/>\n\t'+L[2]+': '+L[4]
html += '<tr style="background-color:{color}">\n\t<td align="right">{image}</td>\n\t<td valign="top" style="padding:10px"><strong>{price}</strong><br/>\n\t{infos}\t</td>\n</tr>\n'.format(image=image,price=L[1],infos=infos,color=colors[switch])
html += '</table></body></html>'
try:
textappend(htmlfile,html)
log('writing html file')
except:
log('error writing html file')
createHtml(txtfile)
Radios scraping
todo: web<>PI interface
import json
import urllib.request
import urllib.parse
import os
import datetime
from pathlib import Path
#todo: make sure tail works without opening ressource entirely
path = os.path.dirname(os.path.realpath(__file__))
#path = os.getcwd()
sep = " | "
date = datetime.datetime.now().strftime("%y%m%d")
def touch(fname, times=None):
with open(fname, 'a'):
os.utime(fname,times)
def tail(f, lines=1, _buffer=4098):
'''Tail a file and get X lines from the end'''
lines_found = []
block_counter = -1
file = open(f,'r')
while len(lines_found) < lines:
try:
file.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
file.seek(0)
lines_found = file.readlines()
break
lines_found = file.readlines()
block_counter -= 1
return lines_found[-lines:]
file.close()
def getlastlines(file,n):
''' https://stackoverflow.com/questions/46258499/how-to-read-the-last-line-of-a-file-in-python '''
with open(file, 'r') as f:
lines = f.readlines()
return lines[-n:]
#####################
# #
# RADIO MEUH #
# #
#####################
url = "https://www.radiomeuh.com/player/rtdata/tracks.json"
#localfile = path+"/home/bernie/tools/loop/data/playlist.txt"
localfile = path+"/data/playlist_meuh.txt"
touch(localfile)
lastlines = getlastlines(localfile,10)
content = urllib.request.urlopen(url)
data = json.loads(content.read())
for el in reversed(data):
record = ("{date}{sep}{time}{sep}{artist}{sep}{titre}".format(sep=sep,date=date,time=el['time'],artist=el['artist'],titre=el['titre']))
artisttitre = str(el['artist']+sep+el['titre'])
if len(lastlines) < 2:
lastlines = ['0','0']
#if (record != lastlines[0].strip()) and (record != lastlines[1].strip()):
if not any(artisttitre in s for s in lastlines):
with open(localfile, 'a',encoding='utf-8-sig') as recordlist:
recordlist.write(record+'\n')
#####################
# #
# ELLEBORE #
# #
#####################
url = 'https://www.radio-ellebore.com/wp-admin/admin-ajax.php'
localfile = path+"/data/playlist_ellebore.txt"
touch(localfile)
lastline = getlastlines(localfile,1)
if len(lastline)==0:
lastline = ['0 | 0 | 0 | 0']
lastlog = lastline[0].strip().split(' | ')
data = {'action':'get_stream_title','stream':'https://ellebore.ice.infomaniak.ch/ellebore-high.aac'}
data = urllib.parse.urlencode(data).encode()
hdr = { 'Content-Type':'application/x-www-form-urlencoded; charset=UTF-8' }
req = urllib.request.Request(url, data=data, headers=hdr,method='POST')
response = urllib.request.urlopen(req)
result = response.read().decode('utf-8')
result = result.split(' - ')
time = datetime.datetime.now().strftime("%H:%M:%S")
record = ("{date}{sep}{time}{sep}{artist}{sep}{titre}".format(sep=sep,date=date,time=time,artist=result[0].strip(),titre=result[1].strip()))
if lastlog[-1].strip() == '|':
lastlog = ['a','b','c','d']
if str(result[0]+result[1]) != str(lastlog[2]+lastlog[3]) or lastline[0] == '0 | 0 | 0 | 0':
with open(localfile, 'a',encoding='utf-8-sig') as recordlist:
recordlist.write(record+'\n')
#le djam https://www.djamradio.com/actions/infos.php
#https://www.djamradio.com/actions/retrieve.php
Create a chart with frame size and render time
py2
Requires matplotlib to make charts, runs on right-clicking a file part of a sequence usign the windows sendto>menu (that you can edit by doing Win-R > shell:sendto)
python -m pip install -U pip python -m pip install -U matplotlib
Creates a chart with render time and frame size using file creation time (so as it was pointed out a little useless for multi-machine renders). Useful if you want to get a glimpse of simulation frame times
import os, sys, re, time
import matplotlib.pyplot as plt
import numpy as np
def sequence(file):
'''given a file path, return a dictionnary with [directory, filename (without number and extension), start frame, end frame, padding, extension]'''
# given C:/path/the_sequence_0033.jpg
# like so ['c:/path/','the_sequence_',1,100,4,'.jpg']
if os.path.isfile(file):
reg = r'^(.+?)([0-9]+)\.([.a-zA-Z]{1,7})$'
match = re.match(reg, file,re.IGNORECASE)
if match:
#return target
newReg = r'('+os.path.basename(match.groups()[0])+')(\d*)\.('+match.groups()[2]+')'
#bit convoluted but it will help me pick the first image of sequence that matches selection
filelist = []
target = os.path.dirname(file)
for f in os.listdir(target):
match = re.match(newReg, f,re.IGNORECASE)
if match:
filelist.append(match.groups())
return [ target , filelist[0][0] , int(filelist[0][1]) , int(filelist[-1][1]) , len(filelist[0][1]) , filelist[0][2] ]
sequence = sequence(sys.argv[1])
start = sequence[2]
end = sequence[3]
sequenceInfos = []
for n in range(start,end+1):
file = '{folder}/{file}{number}.{extension}'.format( folder = sequence[0], file = sequence[1], number = str(n).zfill( sequence[4] ), extension = sequence[5] )
file = os.path.abspath(file)
creationTime = os.path.getctime(file)
size = os.path.getsize(file)
sequenceInfos.append([file,creationTime,size])
frames = list(range(start, end + 1))
times = [ x[1] for x in sequenceInfos ] #creation times
times_temp = times.copy()
for i in range(1,len(times)):
times_temp[i] = times[i] - times[i-1]
times_temp[0] = times_temp[1]
times = times_temp.copy()
sizes = [ x[2] / 1024.0 / 1024.0 for x in sequenceInfos ]
fig, ax1 = plt.subplots()
color = 'tab:red'
ax1.set_xlabel('frames')
ax1.set_ylabel('Frame Time (s)', color=color)
ax1.plot(frames, times, color=color)
ax1.tick_params(axis='y', labelcolor=color)
ax1.set_ylim(bottom=0)
ax2 = ax1.twinx()
color = 'tab:blue'
ax2.set_ylabel('Size (mb)', color=color) # we already handled the x-label with ax1
ax2.plot(frames, sizes, color=color)
ax2.tick_params(axis='y', labelcolor=color)
ax2.set_ylim(bottom=0)
fig.tight_layout()
plt.grid()
plt.show()
Houdini remote killswitch
import urllib.request
import os
import time
from random import random
while True:
url = "https://berniebernie.fr/tools/stamp/up.txt"
url += "?"+str(random())
contents = urllib.request.urlopen(url).read()
contents = contents.decode()
if contents=='kill':
os.system('taskkill /F /IM "houdini*"')
print('killed')
break
else:
print(url)
print(contents)
time.sleep(30)
Create a folder with today's date
import os
import time
from datetime import date
today = date.today()
folder = today.strftime("%Y_%m_%d")
folder = os.getcwd()+'/'+folder
try:
os.mkdir(folder)
print("creating "+folder)
except:
print(folder+ " exists or failed")
time.sleep(.8)
Find the name of the cloest matching color
def colorName(r,g,b):
'''returns the name of the closest color from the list using distance from r,g,b so might no be super precise :) '''
colors = {
'AliceBlue':[240,248,255],
'AntiqueWhite':[250,235,215],
'Aqua':[0,255,255],
'Aquamarine':[127,255,212],
'Azure':[240,255,255],
'Beige':[245,245,220],
'Bisque':[255,228,196],
'Black':[0,0,0],
'BlanchedAlmond':[255,235,205],
'Blue':[0,0,255],
'BlueViolet':[138,43,226],
'Brown':[165,42,42],
'BurlyWood':[222,184,135],
'CadetBlue':[95,158,160],
'Chartreuse':[127,255,0],
'Chocolate':[210,105,30],
'Coral':[255,127,80],
'CornflowerBlue':[100,149,237],
'Cornsilk':[255,248,220],
'Crimson':[220,20,60],
'Cyan':[0,255,255],
'DarkBlue':[0,0,139],
'DarkCyan':[0,139,139],
'DarkGoldenRod':[184,134,11],
'DarkGray':[169,169,169],
'DarkGrey':[169,169,169],
'DarkGreen':[0,100,0],
'DarkKhaki':[189,183,107],
'DarkMagenta':[139,0,139],
'DarkOliveGreen':[85,107,47],
'DarkOrange':[255,140,0],
'DarkOrchid':[153,50,204],
'DarkRed':[139,0,0],
'DarkSalmon':[233,150,122],
'DarkSeaGreen':[143,188,143],
'DarkSlateBlue':[72,61,139],
'DarkSlateGray':[47,79,79],
'DarkSlateGrey':[47,79,79],
'DarkTurquoise':[0,206,209],
'DarkViolet':[148,0,211],
'DeepPink':[255,20,147],
'DeepSkyBlue':[0,191,255],
'DimGray':[105,105,105],
'DimGrey':[105,105,105],
'DodgerBlue':[30,144,255],
'FireBrick':[178,34,34],
'FloralWhite':[255,250,240],
'ForestGreen':[34,139,34],
'Fuchsia':[255,0,255],
'Gainsboro':[220,220,220],
'GhostWhite':[248,248,255],
'Gold':[255,215,0],
'GoldenRod':[218,165,32],
'Gray':[128,128,128],
'Grey':[128,128,128],
'Green':[0,128,0],
'GreenYellow':[173,255,47],
'HoneyDew':[240,255,240],
'HotPink':[255,105,180],
'IndianRed ':[205,92,92],
'Indigo ':[75,0,130],
'Ivory':[255,255,240],
'Khaki':[240,230,140],
'Lavender':[230,230,250],
'LavenderBlush':[255,240,245],
'LawnGreen':[124,252,0],
'LemonChiffon':[255,250,205],
'LightBlue':[173,216,230],
'LightCoral':[240,128,128],
'LightCyan':[224,255,255],
'LightGoldenRodYellow':[250,250,210],
'LightGray':[211,211,211],
'LightGrey':[211,211,211],
'LightGreen':[144,238,144],
'LightPink':[255,182,193],
'LightSalmon':[255,160,122],
'LightSeaGreen':[32,178,170],
'LightSkyBlue':[135,206,250],
'LightSlateGray':[119,136,153],
'LightSlateGrey':[119,136,153],
'LightSteelBlue':[176,196,222],
'LightYellow':[255,255,224],
'Lime':[0,255,0],
'LimeGreen':[50,205,50],
'Linen':[250,240,230],
'Magenta':[255,0,255],
'Maroon':[128,0,0],
'MediumAquaMarine':[102,205,170],
'MediumBlue':[0,0,205],
'MediumOrchid':[186,85,211],
'MediumPurple':[147,112,219],
'MediumSeaGreen':[60,179,113],
'MediumSlateBlue':[123,104,238],
'MediumSpringGreen':[0,250,154],
'MediumTurquoise':[72,209,204],
'MediumVioletRed':[199,21,133],
'MidnightBlue':[25,25,112],
'MintCream':[245,255,250],
'MistyRose':[255,228,225],
'Moccasin':[255,228,181],
'NavajoWhite':[255,222,173],
'Navy':[0,0,128],
'OldLace':[253,245,230],
'Olive':[128,128,0],
'OliveDrab':[107,142,35],
'Orange':[255,165,0],
'OrangeRed':[255,69,0],
'Orchid':[218,112,214],
'PaleGoldenRod':[238,232,170],
'PaleGreen':[152,251,152],
'PaleTurquoise':[175,238,238],
'PaleVioletRed':[219,112,147],
'PapayaWhip':[255,239,213],
'PeachPuff':[255,218,185],
'Peru':[205,133,63],
'Pink':[255,192,203],
'Plum':[221,160,221],
'PowderBlue':[176,224,230],
'Purple':[128,0,128],
'RebeccaPurple':[102,51,153],
'Red':[255,0,0],
'RosyBrown':[188,143,143],
'RoyalBlue':[65,105,225],
'SaddleBrown':[139,69,19],
'Salmon':[250,128,114],
'SandyBrown':[244,164,96],
'SeaGreen':[46,139,87],
'SeaShell':[255,245,238],
'Sienna':[160,82,45],
'Silver':[192,192,192],
'SkyBlue':[135,206,235],
'SlateBlue':[106,90,205],
'SlateGray':[112,128,144],
'SlateGrey':[112,128,144],
'Snow':[255,250,250],
'SpringGreen':[0,255,127],
'SteelBlue':[70,130,180],
'Tan':[210,180,140],
'Teal':[0,128,128],
'Thistle':[216,191,216],
'Tomato':[255,99,71],
'Turquoise':[64,224,208],
'Violet':[238,130,238],
'Wheat':[245,222,179],
'White':[255,255,255],
'WhiteSmoke':[245,245,245],
'Yellow':[255,255,0],
'YellowGreen':[154,205,50]
}
testColor = [25,245,1]
closestColor = 'None'
diff = 99999999
for color in colors:
d = pow(colors[color][0] - r,2)
d = d + pow(colors[color][1] - g,2)
d = d + pow(colors[color][2] - b,2)
if min(diff,d) == d:
diff = d
closestColor = color
return closestColor
print(colorName(254,50,23))
List all objs in directory and subdirectories to text file
import os
dir_path = os.path.dirname(os.path.realpath(__file__))
listfilename = 'objs_list.txt'
objlist = []
for root, dirs, files in os.walk(dir_path):
for file in files:
if file.lower().endswith('.obj'):
objlist.append(os.path.join(root, file))
print('Writing '+str(len(objlist))+'to '+dir_path+'/'+listfilename)
with open(dir_path+'/'+listfilename, 'w') as file_handler:
for item in objlist:
file_handler.write("{}\n".format(item))
elections 2017 scraping
# # -*- coding: utf-8 -*-
from urllib import urlopen
import re
import urlparse
import os
baseUrl = 'http://elections.interieur.gouv.fr/presidentielle-2017/'
townUrlsFile = os.path.dirname(os.path.realpath(__file__))+'/town_urls.txt'
#print townUrlsFile
def getTownUrlsList():
with open(townUrlsFile, 'w') as fid:
count = 0
townlist = []
#get the page
page = urlopen(baseUrl+'index.html')
page_content_HTML = page.read()
#grab the list
start = 'selected>Choisir un département</option>'
end = '</select><br><p class="clic-carte">'
departement_HTML = (page_content_HTML.split(start))[1].split(end)[0]
#iterate through departments (options)
options = re.findall(r'<option value="(.*)">(.*)</option>',departement_HTML,re.M)
for option in options:
#get the page
page = urlopen(baseUrl+option[0])
page_content_HTML = page.read()
#grab the list of town letters
start = 'initiale</i><br>'
end = '\xa0\n\t\t\t<hr>\n</div></div>\n<div class="row-fluid pub-index-communes">'
town_letters_HTML = (page_content_HTML.split(start))[1].split(end)[0]
#iterate through town letters (A = all towns with A in this departement etc...)
town_letters = re.findall(r'<a href="../../(.*)">(.*)</a>',town_letters_HTML,re.M)
for town_letter in town_letters:
page = urlopen(baseUrl+town_letter[0])
page_content_HTML = page.read()
#grab the list of towns
start = 'tableau-communes"><tbody>'
end = '</tbody></table>\n<br>\n</div></div>\n</div>\n<div class="row-fluid pub-bas">\n<div class="span5">'
towns_HTML = (page_content_HTML.split(start))[1].split(end)[0]
#print towns_HTML
towns = re.findall(r'<tr><td><a href="../../(.*)">(.*)</a>',towns_HTML,re.M)
#iterate through towns
for town in towns:
currentTown = option[1]+'|'+town[1]+'|'+baseUrl+town[0]
#townlist.append()
fid.write(currentTown+'\n')
count = count + 1
print count
#return townlist
#fid.write('\n'.join(getTownUrlsList()))
#fid.close()
getTownUrlsList()
# # -*- coding: utf-8 -*-
from urllib import urlopen
import re
import urlparse
import os
page = urlopen('http://elections.interieur.gouv.fr/presidentielle-2017/011/075/index.html')
page_content_HTML = page.read()
#grab the list of town letters
if 'arrondissement</i></p>' in page_content_HTML:
start = 'arrondissement</i></p>'
end = '<div class="row-fluid pub-resultats-entete'
else:
start = 'initiale</i><br>'
end = '\xa0\n\t\t\t<hr>\n</div></div>\n<div class="row-fluid pub-index-communes">'
town_letters_HTML = (page_content_HTML.split(start))[1].split(end)[0]
for arrondissement in town_letters_HTML.split('</a> <a href'):
print arrondissement.split('../../')[1].split('">')
#town_letters = re.findall(r'<a href="../../(.*)">(.*)</a>',town_letters_HTML,re.M)
#for town_letter in town_letters:
#print town_letter
colorlovers color scraping
# # -*- coding: utf-8 -*-
from urllib.request import urlopen
import os
import codecs
import math
import time
def grabCL(n):
url = "http://www.colourlovers.com/ajax/browse-palettes/_page_"+str(n)+"?section=most-loved&period=all-time&view=meta&channelID=0"
page = urlopen(url)
page_content = page.read()
with open('Z:/BERNIE/vvvv/palettes/cl/output'+str(n)+'.txt', 'w') as fid:
fid.write(str(page_content))
fid.close()
def parseCL(n):
''' disgusting code to parse webpage because i can't figure out beautifulsoup '''
output = ""
titles = []
with open('Z:/BERNIE/vvvv/palettes/cl/output'+str(n)+'.txt', 'r') as fid:
for line in fid:
tokens = line.split("</a></h3><div class=\"left username\"")
p = len(tokens)
for i in range(p):
tokensTitle = tokens[i].split("\">")
titles.append(tokensTitle[-1])
#get colors
lines = line.split("<span class=\"c\" style=\"width: ")
j = 1
while j<len(lines):
#print(titles[(int((j-1)/10))])
output += "\n"+str(int((j-1)/10)+(n-1)*15)+" "+titles[(int((j-1)/10))].replace("\\", "")+"\n"
for k in range(5):
curline = lines[j+k]
widthTokens = curline.split("px; height: 50px;")
width = widthTokens[0]
colorTokens = curline.split(";\"><span class=\"s\" style=\"margin-top: 45px;\">")
color = colorTokens[0][-7:]
colorString = color+" "+str(float(width)/560)[:6]
#print(colorString)
output += colorString+"\n"
#output += "\n"
j = j + 10
return output
def scrapeCL(startPage, endPage, waitInSeconds):
for i in range(startPage, endPage+1):
grabCL(i)
out = parseCL(i)
#print(str(i))
with open("Z:/BERNIE/vvvv/palettes/colors_cl.txt", "a") as myfile:
myfile.write(out)
print("Page "+str(i)+" grabbed... "+str(i*15)+ " records")
time.sleep(waitInSeconds)
scrapeCL(1,270,1)
Raytracer
from PIL import Image
from math import sqrt
imwidth = 640
imheight = 480
im = Image.new("RGB",(imwidth,imheight),"black")
#im = Image.open("lolmonkey.jpg")
#im.show()
#im.save("hellwrld.png","PNG")
#NUMPY
mysphere = [[0,0,0],200]
#myray = [[0,0,10],[0,0,-1]]
def intersectRaySphere(ray,sphere):
A = ray[1][0]*ray[1][0] + ray[1][1]*ray[1][1] + ray[1][2]*ray[1][2]
B = 2.0 * (ray[1][0]*(ray[0][0]-sphere[0][0]) + ray[1][1]*(ray[0][1]-sphere[0][1]) + ray[1][2]*(ray[0][2]-sphere[0][2]))
C = (ray[0][0]-sphere[0][0])*(ray[0][0]-sphere[0][0]) + (ray[0][1]-sphere[0][1])*(ray[0][1]-sphere[0][1]) + (ray[0][2]-sphere[0][2])*(ray[0][2]-sphere[0][2]) - sphere[1]
delta = B*B - 4.0*A*C
results = []
if(delta==0):
results.append(-B/(2.0*A))
if(delta>0):
results.append((-B+(sqrt(delta)))/(2.0*A))
results.append((-B-(sqrt(delta)))/(2.0*A))
points = []
for t in results:
points.append([ray[0][0] + t*ray[1][0], ray[0][1] + t*ray[1][1], ray[0][2] + t*ray[1][2]])
if(len(points)==2):
if(points[0]>points[1]):
points = points[1]
else:
points = points[0]
return points
def mag(vec):
return sqrt(vec[0]*vec[0] + vec[1]*vec[1] + vec[2]*vec[2]);
def cross(vec1,vec2):
return [vec1[1]*vec2[2]-vec1[2]*vec2[1], vec1[2]*vec2[0]-vec1[0]*vec2[2], vec1[0]*vec2[1]-vec1[1]*vec2[0]]
def normalize(vec):
return [vec[0]/mag(vec),vec[1]/mag(vec),vec[2]/mag(vec)]
def dot(vec1,vec2):
return vec1[0]*vec2[0] + vec1[1]*vec2[1] + vec1[2]*vec2[2]
def pixToPoint(i,j,width,height,xPixSize,yPixSize,center,u,v):
cu = (float(2*i+1)/(2*xPixSize)-.5)*width
cv = (float(2*j+1)/(2*yPixSize)-.5)*height
x = center[0]+cu*u[0]+cv*v[0]
y = center[1]+cu*u[1]+cv*v[1]
z = center[2]+cu*u[2]+cv*v[2]
#print [x,y,z]
#print [i,j]
#print [cu,cv]
return [x,y,z]
lookat = [0,0,0]
eye = [100,100,100]
f = 10
upvector = [0,1,0]
viewplaneW = imwidth/2
viewplaneH = imheight/2
EA = [lookat[0]-eye[0],lookat[1]-eye[1],lookat[2]-eye[2]]
lenEA = mag(EA)
normEA = [EA[0]/lenEA,EA[1]/lenEA,EA[2]/lenEA]
center = [EA[0]+normEA[0]*f, EA[1]+normEA[1]*f, EA[2]+normEA[2]*f]
w = normEA
u = normalize(cross(upvector,w))
v = normalize(cross(u,w))
#print(cross([1,0,0],[0,1,0]))
light = [0,0,100]
#print intersectRaySphere(myray,mysphere)
for x in range(imwidth):
for y in range(imheight):
#myray = [[x,y,-10],[0,0,1]]
point = pixToPoint(x,y,imwidth,imheight,viewplaneW,viewplaneH,center,u,v)
ray = [point,[point[0]-eye[0] , point[1]-eye[1] , point[2]-eye[2]]]
if(len(intersectRaySphere(ray,mysphere))):
n = normalize([point[0]-mysphere[0][0], point[1]-mysphere[0][1],point[2]-mysphere[0][2]])
i = normalize([light[0]-point[0], light[1]-point[1],light[2]-point[2]])
costheta = dot(n,i)
#if(costheta<0):
# costheta=0
#color = int(costheta*255)
#print n[0]
#print costheta
#print color
im.putpixel((x,y),(int(-n[0]*255),int(-n[1]*255),int(-n[2]*255)))
#im.putpixel((x,y),(255,255,0))
#im.show()
im.save("sphr_"+str(mysphere[1])+".png","PNG")
File Handling Dandelion
import os
import re
def listdirs(folder):
return [d for d in os.listdir(folder) if os.path.isdir(os.path.join(folder, d))]
paths = 'N:/01_OUT'
pattern = 'GB\d+_SC\d+.*_T\d+'
#text = 'GB45_SC34_T3'
#match = re.search(pattern, text)
#print match
f = open('N:/01_OUT/summary.html', 'w')
#f.write('0123456789abcdef')
count = 0
for dir in listdirs(paths):
f.write("<hr>\n\n</br></br>"+dir+"</br>")
subdir = listdirs(paths+"/"+dir)
for takes in subdir:
tk = listdirs(paths+"/"+dir+"/"+takes)
#if(len(tk)>4):
#f.write("\n"+paths+"/"+dir+"/"+takes)
f.write("\n"+paths+"/"+dir+"/"+takes+"</br>")
for take in tk:
match = re.search(pattern,take)
if(match != "None"):
count+=1
if(count % 2 == 1):
c = "#fbfbfb";
else:
c = "#eeeeee";
f.write("\n<div style='background-color:"+c+";'> <input type=checkbox name=\""+(paths+"/"+dir+"/"+takes)+"\" CHECKED>"+take+"</div>")
print take
#print takes+": "+str(len(tk))
#print take+" ("+str(len(tk))+")"
f.close();
raw_input("-")
#for dir in os.listdir(path):
# for subdir in os.listdir(path+"/"+dir):
# takes = os.listdir(path+"/"+dir+"/"+subdir)
# directories=[d for d in os.listdir(path+"/"+dir+"/"+subdir) if os.path.isdir(d)]
# #print subdir+":"+str(len(takes))
# print directories
#
#raw_input("Press ENTER to exit")