add support for he selector switch

main
Shoofle 2 months ago
parent 5a77966102
commit 03c3f1a9f1
  1. 172
      webcam.py

@ -1,7 +1,9 @@
from datetime import datetime, date, time, timedelta from datetime import datetime, date, time, timedelta
import os import os
import cv2 import cv2
from wakepy import keep import gpiod
from gpiod import Direction, Value
#from wakepy import keep
def five_minute_segment_from(the_time): def five_minute_segment_from(the_time):
# this is mocked out too be a one minute segment instead # this is mocked out too be a one minute segment instead
@ -25,85 +27,127 @@ if not camera.isOpened():
# video writer # video writer
fourcc = cv2.VideoWriter_fourcc(*'H264') fourcc = cv2.VideoWriter_fourcc(*'H264')
camera_fps = camera.get(cv2.CAP_PROP_FPS)
video_writer = cv2.VideoWriter(path_to_video_at(last_time_stamp), video_writer = cv2.VideoWriter(path_to_video_at(last_time_stamp),
fourcc, fourcc,
camera.get(cv2.CAP_PROP_FPS), camera_fps,
(int(camera.get(3)),int(camera.get(4))), (int(camera.get(3)),int(camera.get(4))), #resolution
(cv2.VIDEOWRITER_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY) (cv2.VIDEOWRITER_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY)
) )
# video feed from previously captured video # video feed from previously captured video
old_time = cv2.VideoCapture(path_to_video_at(last_time_stamp))
twenty_four_hours_ago = cv2.VideoCapture(path_to_video_at(last_time_stamp - timedelta(hours=24)))
twelve_hours_ago = cv2.VideoCapture(path_to_video_at(last_time_stamp - timedelta(hours=12)))
five_minutes_ago = cv2.VideoCapture(path_to_video_at(last_time_stamp - timedelta(minutes=5)))
# fullscreen window yess good # fullscreen window yess good
cv2.namedWindow("mirror", cv2.WINDOW_NORMAL) cv2.namedWindow("mirror", cv2.WINDOW_NORMAL)
cv2.setWindowProperty("mirror", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN) cv2.setWindowProperty("mirror", cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
queue = [(False, None)] * 100 #queue of recent frames queue = [(False, None)] * math.floor(camera_fps * 32) #queue of 32 seconds of frames
idx = 0 # current index into the frame queue idx = 0 # current index into the frame queue
short_term_delay_frames = 20 # hoow many frames back are we going in the buffer when we're on a short term delay?
longterm = False # are we shoowing a long term delay? (versus a sshort term delay) lines = {
"five_second": 1,
"thirty_second": 2,
"five_minute": 3,
"twelve_hour": 4,
"twenty_four_hour": 5
}
#with keep.presenting(): #with keep.presenting():
while(True): with gpiod.request_lines(
if last_time_stamp != five_minute_segment_from(datetime.now()): "/dev/gpiochip0",
# find file older than our long delay consumer="mirror-to-yesterday",
old_file = path_to_video_at(five_minute_segment_from(datetime.now() - long_delay - timedelta(minutes=1))) config={
if os.path.isfile(old_file): lines.five_second: gpiod.LineSettings(direction=Direction.INPUT),
print(f"deleting old file {old_file}") lines.thirty_second: gpiod.LineSettings(direction=Direction.INPUT),
os.remove(old_file) lines.five_minute: gpiod.LineSettings(direction=Direction.INPUT),
lines.twelve_hour: gpiod.LineSettings(direction=Direction.INPUT),
video_writer.release() lines.twenty_four_hour: gpiod.LineSettings(direction=Direction.INPUT)
newly_recording_file = path_to_video_at(five_minute_segment_from(datetime.now())) }
) as io_lines:
print(f"opening {newly_recording_file} too record this upcoming ssegment to") while(True):
video_writer = cv2.VideoWriter(newly_recording_file, if last_time_stamp != five_minute_segment_from(datetime.now()):
fourcc, # find file older than our long delay
camera.get(cv2.CAP_PROP_FPS), oldest_file_path = path_to_video_at(five_minute_segment_from(datetime.now() - timedelta(hours=24) - timedelta(minutes=10)))
(int(camera.get(3)), int(camera.get(4))), if os.path.isfile(oldest_file_path):
(cv2.VIDEOWRITER_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY) print(f"deleting old file {oldest_file_path}")
) os.remove(oldest_file_path)
video_writer.release()
if old_time.isOpened(): newly_recording_file_path = path_to_video_at(five_minute_segment_from(datetime.now()))
old_time.release()
print(f"opening {newly_recording_file_path} too record this upcoming ssegment to")
new_file = path_to_video_at(five_minute_segment_from(datetime.now() - long_delay)) video_writer = cv2.VideoWriter(newly_recording_file_path,
print(f"opening the previously recorded file {new_file} to read from for long mirror") fourcc,
old_time = cv2.VideoCapture(new_file, cv2.CAP_FFMPEG) camera_fps,
(int(camera.get(3)), int(camera.get(4))),
(cv2.VIDEOWRITER_PROP_HW_ACCELERATION, cv2.VIDEO_ACCELERATION_ANY)
last_time_stamp = five_minute_segment_from(datetime.now()) )
ret, frame = camera.read() if twenty_four_hours_ago.isOpened():
frame = cv2.flip(frame, 1) twenty_four_hours_ago.release()
twenty_four_hours_ago = cv2.VideoCapture(path_to_video_at(five_minute_segment_from(datetime.now() - timedelta(hours=24))))
# record the frame to the appropriate file, and also to the short term buffer if twelve_hours_ago.isOpened():
video_writer.write(frame) twelve_hours_ago.release()
queue[idx] = ret, frame twelve_hours_ago = cv2.VideoCapture(path_to_video_at(five_minute_segment_from(datetime.now() - timedelta(hours=12))))
if five_minutes_ago.isOpened():
prev_frame_valid, prev_frame = queue[(idx - short_term_delay_frames) % len(queue)] five_minutes_ago.release()
old_frame_valid, old_frame = old_time.read() five_minutes_ago = cv2.VideoCapture(path_to_video_at(five_minute_segment_from(datetime.now() - timedelta(minutes=5))))
if not longterm and prev_frame_valid:
cv2.imshow('mirror', prev_frame)
elif longterm and old_frame_valid: last_time_stamp = five_minute_segment_from(datetime.now())
cv2.imshow('mirror', old_frame)
ret, frame = camera.read()
key = cv2.waitKey(16) # listen for keyprress for 16 ms frame = cv2.flip(frame, 1)
if key & 0xFF == ord('q') or key == 27: # q or esc # record the frame to the appropriate file, and also to the short term buffer
break video_writer.write(frame)
if key & 0xFF == ord('a'): queue[idx] = ret, frame
short_term_delay_frames += 1
if key & 0xFF == ord('d'): frames = {
short_term_delay_frames -= 1 "five_second": queue[(idx - math.floor(5*camera_fps)) % len(queue)],
if key & 0xFF == ord('z'): "thirty_second": queue[(idx - math.floor(30*camera_fps)) % len(queue)],
longterm = not longterm "five_minute": five_minutes_ago.read(),
"twelve_hour": twelve_hours_ago.read(),
idx = (idx + 1) % len(queue) "twenty_four_hour": twenty_four_hours_ago.read(),
}
#valid, the_frame = frames.five_second
if io_lines.get_value(lines.twenty_four_hour):
valid, the_frame = frames.twenty_four_hour
elif io_lines.get_value(lines.twelve_hour):
valid, the_frame = frames.twelve_hour
elif io_lines.get_value(lines.five_minute):
valid, the_frame = frames.five_minute
elif io_lines.get_value(lines.thirty_second):
valid, the_frame = frames.thirty_second
else: #if io_lines.get_value(lines.five_second)
valid, the_frame = frames.five_second
if valid:
cv2.imshow("mirror", the_frame)
idx = (idx + 1) % len(queue)
key = cv2.waitKey(16) # listen for keyprress for 16 ms
if key & 0xFF == ord('q') or key == 27: # q or esc
break
"""if key & 0xFF == ord('a'):
short_term_delay_frames += 1
if key & 0xFF == ord('d'):
short_term_delay_frames -= 1
if key & 0xFF == ord('z'):
longterm = not longterm"""
# After the loop release the cap object # After the loop release the cap object
camera.release() camera.release()
old_time.release()
video_writer.release() video_writer.release()
twenty_four_hours_ago.release()
twelve_hours_ago.release()
five_minutes_ago.release()
# Destroy all the windows # Destroy all the windows
cv2.destroyAllWindows() cv2.destroyAllWindows()

Loading…
Cancel
Save