오늘이라도
[Raspberry Pi] 8. 파이 카메라, 웹으로 카메라 전송 본문
반응형
https://github.com/upcake/Class_Examples
교육 중에 작성한 예제들은 깃허브에 올려두고 있습니다.
gif 파일은 클릭해서 보는 것이 정확합니다.
- 파이 카메라, 웹으로 카메라 전송 -
#원격으로 하면 잘 되지 않는 경우가 생기므로 직접 연결 추천
#카메라 쓰기 위해 PiCamera import
from picamera import PiCamera
#시간 사용하기 위해 time에 있는 sleep 함수
from time import sleep
#카메라 변수 선언
camera = PiCamera()
#미리보기 함수
camera.start_preview()
sleep(5)
#사진 찍기 : 경로를 지정해 줌
#경로 지정 안하면 'home/pi/capture.jpg'
camera.capture('/home/pi/Pictures/capture.jpg')
#미리보기 정지
camera.stop_preview()
camera.close()
▲PiCamera.py
#원격으로 하면 잘 되지 않는 경우가 생기므로 직접 연결 추천
#카메라 쓰기 위해 PiCamera import
from picamera import PiCamera
#시간 사용하기 위해 time에 있는 sleep 함수
from time import sleep
#카메라 변수 선언
camera = PiCamera()
#미리보기 함수
camera.start_preview()
sleep(2)
#영상 촬영 : 경로를 지정해 줌
camera.start_recording('/home/pi/Videos/vid.h264')
sleep(5)
camera.stop_recording()
#미리보기 정지
camera.stop_preview()
camera.close()
▲PiCamera video.py
#datetime은 날짜 가져오기
import datetime
import time
import picamera
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BOARD)
GPIO.setup(11, GPIO.IN, GPIO.PUD_UP)
try:
while True:
with picamera.PiCamera() as camera:
camera.start_preview()
#버튼이 눌릴 때를 기다림
GPIO.wait_for_edge(11, GPIO.FALLING)
ntime = datetime.datetime.now()
print(ntime)
camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg')
camera.stop_preview()
camera.close()
#Ctrl + C를 누르고 버튼을 눌러야 종료된다.
except KeyboardInterrupt:
GPIO.cleanup()
▲PiCamera Button.py
#datetime은 날짜 가져오기
import datetime
import time
import picamera
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BOARD)
GPIO.setup(11, GPIO.IN, GPIO.PUD_UP)
try:
while True:
with picamera.PiCamera() as camera:
camera.start_preview()
#버튼이 눌릴 때를 기다림
GPIO.wait_for_edge(11, GPIO.FALLING) #녹화 시작
ntime = datetime.datetime.now()
print(ntime)
camera.start_recording('/home/pi/Videos/' + str(ntime) + '.h264')
time.sleep(1)
GPIO.wait_for_edge(11, GPIO.FALLING) #녹화 종료
camera.stop_recording()
camera.stop_preview()
camera.close()
#Ctrl + C를 누르고 버튼을 눌러야 종료된다.
except KeyboardInterrupt:
GPIO.cleanup()
▲PiCamera Button Video.py
- 공유기 관리 페이지에서 라즈베리 파이 IP를 포트포워딩 해주면 공용 IP로 접속해서 외부 인터넷으로도 볼 수 있다.
# Web streaming example
# Source code from the official PiCamera package
# http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming
import io
import picamera
import logging
import socketserver
from threading import Condition
from http import server
PAGE="""\
<html>
<head>
<title>Raspberry Pi - Surveillance Camera</title>
</head>
<body>
<center><h1>Raspberry Pi - Surveillance Camera</h1></center>
<center><img src="stream.mjpg" width="640" height="480"></center>
</body>
</html>
"""
class StreamingOutput(object):
def __init__(self):
self.frame = None
self.buffer = io.BytesIO()
self.condition = Condition()
def write(self, buf):
if buf.startswith(b'\xff\xd8'):
# New frame, copy the existing buffer's content and notify all
# clients it's available
self.buffer.truncate()
with self.condition:
self.frame = self.buffer.getvalue()
self.condition.notify_all()
self.buffer.seek(0)
return self.buffer.write(buf)
class StreamingHandler(server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
content = PAGE.encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with output.condition:
output.condition.wait()
frame = output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
else:
self.send_error(404)
self.end_headers()
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
with picamera.PiCamera(resolution='640x480', framerate=24) as camera:
output = StreamingOutput()
#Uncomment the next line to change your Pi's Camera rotation (in degrees)
#camera.rotation = 90
camera.start_recording(output, format='mjpeg')
try:
address = ('', 8000)
server = StreamingServer(address, StreamingHandler)
server.serve_forever()
finally:
camera.stop_recording()
▲camera_CCTV.py
# Web streaming example
# Source code from the official PiCamera package
# http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming
import io
import picamera
import logging
import socketserver
from threading import Condition
from http import server
import datetime
#import random
from PIL import Image
import math, operator
from functools import reduce
prior_image = None
PAGE="""\
<html>
<head>
<title>Raspberry Pi - Surveillance Camera</title>
</head>
<body>
<center><h1>Raspberry Pi - Surveillance Camera</h1></center>
<center><img src="stream.mjpg" width="640" height="480"></center>
</body>
</html>
"""
# motion notice
def detect_motion(camera):
global prior_image
stream = io.BytesIO()
camera.capture(stream, format='jpeg', use_video_port=True)
stream.seek(0)
if prior_image is None:
prior_image = Image.open(stream)
return False
else:
current_image = Image.open(stream)
# Compare current_image to prior_image to detect motion. This is
# left as an exercise for the reader!
h1 = prior_image.histogram()
h2 = current_image.histogram()
samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h1, h2))/len(h1))
#result = random.randint(0, 10) == 0
print(samerate)
# Once motion detection is done, make the prior image the current
prior_image = current_image
if samerate < 1100:
result = False
else:
result = True
return result
class StreamingOutput(object):
def __init__(self):
self.frame = None
self.buffer = io.BytesIO()
self.condition = Condition()
def write(self, buf):
if buf.startswith(b'\xff\xd8'):
# New frame, copy the existing buffer's content and notify all
# clients it's available
self.buffer.truncate()
with self.condition:
self.frame = self.buffer.getvalue()
self.condition.notify_all()
self.buffer.seek(0)
return self.buffer.write(buf)
class StreamingHandler(server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
content = PAGE.encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with output.condition:
output.condition.wait()
frame = output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
if detect_motion(camera):
print('Motion detected!')
ntime = datetime.datetime.now()
camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg')
#with io.open('/home/pi/CCTV/' + str(ntime) + '.h264', 'wb') as oput:
# oput.write(frame)
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
else:
self.send_error(404)
self.end_headers()
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
with picamera.PiCamera(resolution='640x480', framerate=24) as camera:
output = StreamingOutput()
#Uncomment the next line to change your Pi's Camera rotation (in degrees)
#camera.rotation = 90
camera.start_recording(output, format='mjpeg')
try:
address = ('', 8000)
server = StreamingServer(address, StreamingHandler)
server.serve_forever()
finally:
camera.stop_recording()
'''
with picamera.PiCamera() as camera:
camera.resolution = (1280, 720)
stream = picamera.PiCameraCircularIO(camera, seconds=10)
camera.start_recording(stream, format='h264')
try:
while True:
camera.wait_recording(0.5)
if detect_motion(camera):
print('Motion detected!')
# As soon as we detect motion, split the recording to
# record the frames "after" motion
ntime = datetime.datetime.now()
camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264')
# Write the 10 seconds "before" motion to disk as well
write_video(stream)
# Wait until motion is no longer detected, then split
# recording back to the in-memory circular buffer
while detect_motion(camera):
camera.wait_recording(0.5)
print('Motion stopped!')
camera.split_recording(stream)
finally:
camera.stop_recording()
'''
▲camera_CCTV_pic.py
# Web streaming example
# Source code from the official PiCamera package
# http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming
import io
import picamera
import datetime
from PIL import Image
import math, operator
from functools import reduce
prior_image = None
before_rate = 0
last_rate = 0
# motion notice
def detect_motion(camera):
global prior_image, before_rate, last_rate
stream = io.BytesIO()
camera.capture(stream, format='jpeg', use_video_port=True)
stream.seek(0)
if prior_image is None:
prior_image = Image.open(stream)
return False
else:
current_image = Image.open(stream)
# Compare current_image to prior_image to detect motion. This is
# left as an exercise for the reader!
h1 = prior_image.histogram()
h2 = current_image.histogram()
samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h1, h2))/len(h1))
#result = random.randint(0, 10) == 0
print(samerate)
last_rate = samerate
# Once motion detection is done, make the prior image the current
prior_image = current_image
if samerate < 1000 or abs(last_rate-before_rate) > 1000:
result = False
before_rate = last_rate
else:
result = True
before_rate = last_rate
return result
def write_video(stream):
# Write the entire content of the circular buffer to disk. No need to
# lock the stream here as we're definitely not writing to it
# simultaneously
with io.open('/home/pi/CCTV/before.h264', 'wb') as output:
for frame in stream.frames:
if frame.header:
stream.seek(frame.position)
break
while True:
buf = stream.read1()
if not buf:
break
output.write(buf)
# Wipe the circular stream once we're done
stream.seek(0)
stream.truncate()
with picamera.PiCamera() as camera:
camera.resolution = (640, 480)
stream = picamera.PiCameraCircularIO(camera, seconds=10)
camera.start_recording(stream, format='h264')
try:
while True:
camera.wait_recording(0.5)
if detect_motion(camera):
print('Motion detected!')
# As soon as we detect motion, split the recording to
# record the frames "after" motion
ntime = datetime.datetime.now()
camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264')
# Write the 10 seconds "before" motion to disk as well
write_video(stream)
# Wait until motion is no longer detected, then split
# recording back to the in-memory circular buffer
while detect_motion(camera):
camera.wait_recording(0.5)
print('Motion stopped!')
camera.split_recording(stream)
finally:
camera.stop_recording()
▲camera_move_rec.py
반응형
'취업성공패키지 SW 개발자 교육 > 사물 인터넷(IoT)' 카테고리의 다른 글
[Raspberry Pi] 9. 라즈베리 파이, 스프링, DB 연동 (0) | 2020.07.16 |
---|---|
[Raspberry Pi] 7. 아두이노 연동하기 : 아두이노 IDE 설치, 시리얼 모니터로 조작 (PWM), Firmata 프로토콜 이용해서 파이썬으로 조작 (0) | 2020.07.14 |
[Raspberry Pi] 6. 피에조 스피커, 기울기 센서, 터치 센서, 초음파 거리 센서, 온습도 센서 (0) | 2020.07.13 |
[Raspberry Pi] 5. 버튼 눌러서 LED 시간 증가 (0) | 2020.07.10 |
[Raspberry Pi] 4. LED 조작 : PWM, 버튼 이용 (0) | 2020.07.09 |