오늘이라도

[Raspberry Pi] 8. 파이 카메라, 웹으로 카메라 전송 본문

취업성공패키지 SW 개발자 교육/사물 인터넷(IoT)

[Raspberry Pi] 8. 파이 카메라, 웹으로 카메라 전송

upcake_ 2020. 7. 15. 09:26
반응형

https://github.com/upcake/Class_Examples

교육 중에 작성한 예제들은 깃허브에 올려두고 있습니다. 

gif 파일은 클릭해서 보는 것이 정확합니다.


 - 파이 카메라, 웹으로 카메라 전송 -

▲파이 카메라, 시간으로 촬영
#원격으로 하면 잘 되지 않는 경우가 생기므로 직접 연결 추천
#카메라 쓰기 위해 PiCamera import
from picamera import PiCamera

#시간 사용하기 위해 time에 있는 sleep 함수
from time import sleep

#카메라 변수 선언
camera = PiCamera()

#미리보기 함수
camera.start_preview()
sleep(5)

#사진 찍기 : 경로를 지정해 줌
#경로 지정 안하면 'home/pi/capture.jpg'
camera.capture('/home/pi/Pictures/capture.jpg')

#미리보기 정지
camera.stop_preview()
camera.close()

▲PiCamera.py

 

▲파이 카메라, 시간만큼 비디오 촬영
#원격으로 하면 잘 되지 않는 경우가 생기므로 직접 연결 추천
#카메라 쓰기 위해 PiCamera import
from picamera import PiCamera

#시간 사용하기 위해 time에 있는 sleep 함수
from time import sleep

#카메라 변수 선언
camera = PiCamera()

#미리보기 함수
camera.start_preview()
sleep(2)

#영상 촬영 : 경로를 지정해 줌
camera.start_recording('/home/pi/Videos/vid.h264')
sleep(5)
camera.stop_recording()

#미리보기 정지
camera.stop_preview()
camera.close()

▲PiCamera video.py

 

 

▲파이카메라 버튼으로 촬영
#datetime은 날짜 가져오기

import datetime
import time
import picamera
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BOARD)
GPIO.setup(11, GPIO.IN, GPIO.PUD_UP)

try:
    while True:
        with picamera.PiCamera() as camera:
            camera.start_preview()
            #버튼이 눌릴 때를 기다림
            GPIO.wait_for_edge(11, GPIO.FALLING)
            ntime = datetime.datetime.now()
            print(ntime)
            camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg')
            camera.stop_preview()
            camera.close()
            #Ctrl + C를 누르고 버튼을 눌러야 종료된다.
except KeyboardInterrupt:
    GPIO.cleanup()

▲PiCamera Button.py

 

#datetime은 날짜 가져오기

import datetime
import time
import picamera
import RPi.GPIO as GPIO

GPIO.setmode(GPIO.BOARD)
GPIO.setup(11, GPIO.IN, GPIO.PUD_UP)

try:
    while True:
        with picamera.PiCamera() as camera:
            camera.start_preview()
            
            #버튼이 눌릴 때를 기다림
            GPIO.wait_for_edge(11, GPIO.FALLING) #녹화 시작
            ntime = datetime.datetime.now()
            print(ntime)
            camera.start_recording('/home/pi/Videos/' + str(ntime) + '.h264')
            time.sleep(1)
            
            GPIO.wait_for_edge(11, GPIO.FALLING) #녹화 종료
            camera.stop_recording()
            
            camera.stop_preview()
            camera.close()
            #Ctrl + C를 누르고 버튼을 눌러야 종료된다.
except KeyboardInterrupt:
    GPIO.cleanup()

▲PiCamera Button Video.py

 

 

 

▲웹으로 카메라 전송

 - 공유기 관리 페이지에서 라즈베리 파이 IP를 포트포워딩 해주면 공용 IP로 접속해서 외부 인터넷으로도 볼 수 있다.

# Web streaming example
# Source code from the official PiCamera package
# http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming

import io
import picamera
import logging
import socketserver
from threading import Condition
from http import server

PAGE="""\
<html>
<head>
<title>Raspberry Pi - Surveillance Camera</title>
</head>
<body>
<center><h1>Raspberry Pi - Surveillance Camera</h1></center>
<center><img src="stream.mjpg" width="640" height="480"></center>
</body>
</html>
"""

class StreamingOutput(object):
    def __init__(self):
        self.frame = None
        self.buffer = io.BytesIO()
        self.condition = Condition()

    def write(self, buf):
        if buf.startswith(b'\xff\xd8'):
            # New frame, copy the existing buffer's content and notify all
            # clients it's available
            self.buffer.truncate()
            with self.condition:
                self.frame = self.buffer.getvalue()
                self.condition.notify_all()
            self.buffer.seek(0)
        return self.buffer.write(buf)

class StreamingHandler(server.BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/':
            self.send_response(301)
            self.send_header('Location', '/index.html')
            self.end_headers()
        elif self.path == '/index.html':
            content = PAGE.encode('utf-8')
            self.send_response(200)
            self.send_header('Content-Type', 'text/html')
            self.send_header('Content-Length', len(content))
            self.end_headers()
            self.wfile.write(content)
        elif self.path == '/stream.mjpg':
            self.send_response(200)
            self.send_header('Age', 0)
            self.send_header('Cache-Control', 'no-cache, private')
            self.send_header('Pragma', 'no-cache')
            self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
            self.end_headers()
            try:
                while True:
                    with output.condition:
                        output.condition.wait()
                        frame = output.frame
                    self.wfile.write(b'--FRAME\r\n')
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', len(frame))
                    self.end_headers()
                    self.wfile.write(frame)
                    self.wfile.write(b'\r\n')
            except Exception as e:
                logging.warning(
                    'Removed streaming client %s: %s',
                    self.client_address, str(e))
        else:
            self.send_error(404)
            self.end_headers()

class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
    allow_reuse_address = True
    daemon_threads = True

with picamera.PiCamera(resolution='640x480', framerate=24) as camera:
    output = StreamingOutput()
    #Uncomment the next line to change your Pi's Camera rotation (in degrees)
    #camera.rotation = 90
    camera.start_recording(output, format='mjpeg')
    try:
        address = ('', 8000)
        server = StreamingServer(address, StreamingHandler)
        server.serve_forever()
    finally:
        camera.stop_recording()

▲camera_CCTV.py

 

# Web streaming example
# Source code from the official PiCamera package
# http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming

import io
import picamera
import logging
import socketserver
from threading import Condition
from http import server

import datetime
#import random
from PIL import Image
import math, operator
from functools import reduce

prior_image = None

PAGE="""\
<html>
<head>
<title>Raspberry Pi - Surveillance Camera</title>
</head>
<body>
<center><h1>Raspberry Pi - Surveillance Camera</h1></center>
<center><img src="stream.mjpg" width="640" height="480"></center>
</body>
</html>
"""

# motion notice 
def detect_motion(camera):
    global prior_image
    stream = io.BytesIO()
    camera.capture(stream, format='jpeg', use_video_port=True)
    stream.seek(0)
    if prior_image is None:
        prior_image = Image.open(stream)
        return False
    else:
        current_image = Image.open(stream)
        # Compare current_image to prior_image to detect motion. This is
        # left as an exercise for the reader!
        h1 = prior_image.histogram()
        h2 = current_image.histogram()
        samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h1, h2))/len(h1))
        #result = random.randint(0, 10) == 0
        print(samerate)
        # Once motion detection is done, make the prior image the current
        prior_image = current_image
        if samerate < 1100:
            result = False
        else:
            result = True
            
        return result 


class StreamingOutput(object):
    def __init__(self):
        self.frame = None
        self.buffer = io.BytesIO()
        self.condition = Condition()

    def write(self, buf):
        if buf.startswith(b'\xff\xd8'):
            # New frame, copy the existing buffer's content and notify all
            # clients it's available
            self.buffer.truncate()
            with self.condition:
                self.frame = self.buffer.getvalue()
                self.condition.notify_all()
            self.buffer.seek(0)
        return self.buffer.write(buf)

class StreamingHandler(server.BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/':
            self.send_response(301)
            self.send_header('Location', '/index.html')
            self.end_headers()
        elif self.path == '/index.html':
            content = PAGE.encode('utf-8')
            self.send_response(200)
            self.send_header('Content-Type', 'text/html')
            self.send_header('Content-Length', len(content))
            self.end_headers()
            self.wfile.write(content)
        elif self.path == '/stream.mjpg':
            self.send_response(200)
            self.send_header('Age', 0)
            self.send_header('Cache-Control', 'no-cache, private')
            self.send_header('Pragma', 'no-cache')
            self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
            self.end_headers()
            try:
                while True:
                    with output.condition:
                        output.condition.wait()
                        frame = output.frame
                    self.wfile.write(b'--FRAME\r\n')
                    self.send_header('Content-Type', 'image/jpeg')
                    self.send_header('Content-Length', len(frame))
                    self.end_headers()
                    
                    if detect_motion(camera):
                        print('Motion detected!')
                        ntime = datetime.datetime.now()
                        camera.capture('/home/pi/Pictures/' + str(ntime) + '.jpg')
                        #with io.open('/home/pi/CCTV/' + str(ntime) + '.h264', 'wb') as oput:
                        #    oput.write(frame)
                    
                    self.wfile.write(frame)
                    self.wfile.write(b'\r\n')
            except Exception as e:
                logging.warning(
                    'Removed streaming client %s: %s',
                    self.client_address, str(e))
        else:
            self.send_error(404)
            self.end_headers()

class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
    allow_reuse_address = True
    daemon_threads = True

with picamera.PiCamera(resolution='640x480', framerate=24) as camera:
    output = StreamingOutput()
    #Uncomment the next line to change your Pi's Camera rotation (in degrees)
    #camera.rotation = 90
    camera.start_recording(output, format='mjpeg')
    try:        
        address = ('', 8000)
        server = StreamingServer(address, StreamingHandler)
        server.serve_forever()
        
    finally:
        camera.stop_recording()
        
'''        
with picamera.PiCamera() as camera:
    camera.resolution = (1280, 720)
    stream = picamera.PiCameraCircularIO(camera, seconds=10)
    camera.start_recording(stream, format='h264')
    try:
        while True:
            camera.wait_recording(0.5)
            if detect_motion(camera):
                print('Motion detected!')
                # As soon as we detect motion, split the recording to
                # record the frames "after" motion
                ntime = datetime.datetime.now()
                camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264')
                # Write the 10 seconds "before" motion to disk as well
                write_video(stream)
                # Wait until motion is no longer detected, then split
                # recording back to the in-memory circular buffer
                while detect_motion(camera):
                    camera.wait_recording(0.5)
                print('Motion stopped!')
                camera.split_recording(stream)
    finally:
        camera.stop_recording()
'''

▲camera_CCTV_pic.py

 

# Web streaming example
# Source code from the official PiCamera package
# http://picamera.readthedocs.io/en/latest/recipes2.html#web-streaming

import io
import picamera
import datetime
from PIL import Image
import math, operator
from functools import reduce

prior_image = None
before_rate = 0
last_rate = 0

# motion notice 
def detect_motion(camera):
    global prior_image, before_rate, last_rate
    stream = io.BytesIO()
    camera.capture(stream, format='jpeg', use_video_port=True)
    stream.seek(0)
    if prior_image is None:
        prior_image = Image.open(stream)
        return False
    else:
        current_image = Image.open(stream)
        # Compare current_image to prior_image to detect motion. This is
        # left as an exercise for the reader!
        h1 = prior_image.histogram()
        h2 = current_image.histogram()
        samerate = math.sqrt(reduce(operator.add, map(lambda a,b: (a-b)**2, h1, h2))/len(h1))
        #result = random.randint(0, 10) == 0
        print(samerate)
        last_rate = samerate
        # Once motion detection is done, make the prior image the current
        prior_image = current_image
        if samerate < 1000 or abs(last_rate-before_rate) > 1000:
            result = False
            before_rate = last_rate
        else:
            result = True
            before_rate = last_rate
            
        return result
 
def write_video(stream):
    # Write the entire content of the circular buffer to disk. No need to
    # lock the stream here as we're definitely not writing to it
    # simultaneously
    with io.open('/home/pi/CCTV/before.h264', 'wb') as output:
        for frame in stream.frames:
            if frame.header:
                stream.seek(frame.position)
                break
        while True:
            buf = stream.read1()
            if not buf:
                break
            output.write(buf)
    # Wipe the circular stream once we're done
    stream.seek(0)
    stream.truncate()
        
with picamera.PiCamera() as camera:
    camera.resolution = (640, 480)
    stream = picamera.PiCameraCircularIO(camera, seconds=10)
    camera.start_recording(stream, format='h264')
    try:
        while True:
            camera.wait_recording(0.5)
            if detect_motion(camera):
                print('Motion detected!')
                # As soon as we detect motion, split the recording to
                # record the frames "after" motion
                ntime = datetime.datetime.now()
                camera.split_recording('/home/pi/CCTV/' + str(ntime) + '.h264')
                # Write the 10 seconds "before" motion to disk as well
                write_video(stream)
                # Wait until motion is no longer detected, then split
                # recording back to the in-memory circular buffer
                while detect_motion(camera):
                    camera.wait_recording(0.5)
                print('Motion stopped!')
                camera.split_recording(stream)
    finally:
        camera.stop_recording()

▲camera_move_rec.py

 

 

 

반응형