455 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			455 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
###############################################################################
 | 
						|
# Copyright (C) 2014 by gempa GmbH                                            #
 | 
						|
#                                                                             #
 | 
						|
# All Rights Reserved.                                                        #
 | 
						|
#                                                                             #
 | 
						|
# NOTICE: All information contained herein is, and remains                    #
 | 
						|
# the property of gempa GmbH and its suppliers, if any. The intellectual      #
 | 
						|
# and technical concepts contained herein are proprietary to gempa GmbH       #
 | 
						|
# and its suppliers.                                                          #
 | 
						|
# Dissemination of this information or reproduction of this material          #
 | 
						|
# is strictly forbidden unless prior written permission is obtained           #
 | 
						|
# from gempa GmbH.                                                            #
 | 
						|
#                                                                             #
 | 
						|
# Author: Stephan Herrnkind                                                   #
 | 
						|
# Email: herrnkind@gempa.de                                                   #
 | 
						|
#                                                                             #
 | 
						|
# Requests images via HTTP and stores them into CAPS server                   #
 | 
						|
#                                                                             #
 | 
						|
# All images are downloaded at once at regular interval. If available, the    #
 | 
						|
# file modification time is read from the 'Last-Modififed' HTTP header. The   #
 | 
						|
# sampling time of the packet sent to CAPS is either set to the time          #
 | 
						|
# extracted from EXIF infomartion of from the file modification time.         #
 | 
						|
# Packets are only sent if the current sampling time exceed the previous one. #
 | 
						|
#                                                                             #
 | 
						|
###############################################################################
 | 
						|
 | 
						|
from __future__ import absolute_import, division, print_function
 | 
						|
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import fcntl
 | 
						|
import time
 | 
						|
import traceback
 | 
						|
 | 
						|
import http.client
 | 
						|
import mimetypes
 | 
						|
 | 
						|
from datetime import datetime, timezone
 | 
						|
from dateutil.tz import tzlocal
 | 
						|
 | 
						|
import pytz
 | 
						|
 | 
						|
from seiscomp import logging, client
 | 
						|
 | 
						|
from gempa import CAPS
 | 
						|
 | 
						|
###############################################################################
 | 
						|
 | 
						|
 | 
						|
class StreamInfo:
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def __init__(self, filename, lastMod, lastSent):
 | 
						|
        self.filename = filename
 | 
						|
        self.lastMod = lastMod
 | 
						|
        self.lastSent = lastSent
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class WebCam(client.Application):
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def __init__(self, argc, argv):
 | 
						|
        client.Application.__init__(self, argc, argv)
 | 
						|
        self.setMessagingEnabled(False)
 | 
						|
        self.setDatabaseEnabled(False, False)
 | 
						|
 | 
						|
        self.host = "www.geonet.org.nz"
 | 
						|
        self.port = 443
 | 
						|
        self.ssl = True
 | 
						|
        self.baseURL = "/volcano/cameras/latest/"
 | 
						|
        self.baseDir = "/tmp/webcam"
 | 
						|
        self.lockFile = os.path.join(self.baseDir, "lock")
 | 
						|
        self.interval = 60  # fetch interval in seconds
 | 
						|
 | 
						|
        self.capsHost = "localhost"
 | 
						|
        self.capsPort = 18003
 | 
						|
        self.capsSSL = False
 | 
						|
 | 
						|
        # name of the time zone of the EXIF date, set to None if EXIF date is
 | 
						|
        # in UTC, else e.g. 'Pacific/Auckland'
 | 
						|
        self.exifTZName = None
 | 
						|
        self.exifTZ = None
 | 
						|
        self.localTZ = tzlocal()
 | 
						|
 | 
						|
        self.lastModFmt = "%a, %d %b %Y %H:%M:%S GMT"
 | 
						|
 | 
						|
        self.pushCMD = [
 | 
						|
            "test2caps",
 | 
						|
            "--read-from",
 | 
						|
            "",
 | 
						|
            "--format",
 | 
						|
            "",
 | 
						|
            "--id",
 | 
						|
            "",
 | 
						|
            "--begin",
 | 
						|
            "",
 | 
						|
            "--type",
 | 
						|
            "ANY",
 | 
						|
            "--interval",
 | 
						|
            "0/1",
 | 
						|
            "--verbosity",
 | 
						|
            "0",
 | 
						|
            "-H",
 | 
						|
            "localhost",
 | 
						|
            "-p",
 | 
						|
            "18003",
 | 
						|
        ]
 | 
						|
 | 
						|
        # map stream ids to filenames
 | 
						|
        self.streams = {}
 | 
						|
        self.streamConfig = {
 | 
						|
            "NZ.RAOU..CAM": "raoulisland.jpg",
 | 
						|
            "NZ.TEKA..CAM": "tekaha.jpg",
 | 
						|
            "NZ.WHAK..CAM": "whakatane.jpg",
 | 
						|
            "NZ.TONG..CAM": "tongariro.jpg",
 | 
						|
            "NZ.TONG.TEMAARI.CAM": "tongarirotemaaricrater.jpg",
 | 
						|
            "NZ.NGAU..CAM": "ngauruhoe.jpg",
 | 
						|
            "NZ.RUAP.NORTH.CAM": "ruapehunorth.jpg",
 | 
						|
            "NZ.RUAP.SOUTH.CAM": "ruapehusouth.jpg",
 | 
						|
            "NZ.RUAP.RUHOE.CAM": "ruapehungauruhoe.jpg",
 | 
						|
            "NZ.TARA..CAM": "taranaki.jpg",
 | 
						|
        }
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    # add last modification and last sent time to stream map as
 | 
						|
    # (utc datetime objects)
 | 
						|
    def initStreams(self):
 | 
						|
        logging.info("initializing streams:")
 | 
						|
        for sid, filename in self.streamConfig.items():
 | 
						|
            path = os.path.join(self.baseDir, filename)
 | 
						|
 | 
						|
            modTime = self.readMTime(path)
 | 
						|
 | 
						|
            self.streams[sid] = StreamInfo(filename, modTime, None)
 | 
						|
            logging.info(
 | 
						|
                f"{sid}: file: {filename}, modified: "
 | 
						|
                f"{'-' if modTime is None else modTime.isoformat()}"
 | 
						|
            )
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    # read mtime from file name
 | 
						|
    # return datetime object converted to UTC
 | 
						|
    def readMTime(self, path):
 | 
						|
        # check file existence
 | 
						|
        if not os.path.isfile(path):
 | 
						|
            return None
 | 
						|
 | 
						|
        try:
 | 
						|
            dt = datetime.fromtimestamp(os.path.getmtime(path))
 | 
						|
            # convert to UTC
 | 
						|
            if self.localTZ is not None:
 | 
						|
                dt += self.localTZ.utcoffset(dt)
 | 
						|
 | 
						|
            return dt
 | 
						|
        except Exception as e:
 | 
						|
            logging.error(f"could not extract mtime from file '{path}': {e}")
 | 
						|
 | 
						|
        return None
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    # read date from EXIF header of specified file
 | 
						|
    # return datetime object converted to UTC
 | 
						|
    def readEXIFDate(self, path):
 | 
						|
        datekey = "EXIF DateTimeOriginal"
 | 
						|
        datefmt = "%Y:%m:%d %H:%M:%S"
 | 
						|
 | 
						|
        # check file existence
 | 
						|
        if not os.path.isfile(path):
 | 
						|
            return None, f"no such file: {path}"
 | 
						|
 | 
						|
        try:
 | 
						|
            import exifread  # pylint: disable=C0415,E0401
 | 
						|
        except BaseException:
 | 
						|
            return None, "could not import exifread python module"
 | 
						|
 | 
						|
        # read EXIF data
 | 
						|
        try:
 | 
						|
            with open(path, "rb") as f:
 | 
						|
                tags = exifread.process_file(f)
 | 
						|
        except IOError as e:
 | 
						|
            return None, f"could not read EXIF header: {e}"
 | 
						|
 | 
						|
        # check for date key
 | 
						|
        if datekey not in list(tags.keys()):
 | 
						|
            return (
 | 
						|
                None,
 | 
						|
                f"could not find date key '{datekey}' in EXIF header of "
 | 
						|
                f"file : {path}",
 | 
						|
            )
 | 
						|
 | 
						|
        # parse date
 | 
						|
        datestr = str(tags[datekey])
 | 
						|
        try:
 | 
						|
            dt = datetime.strptime(datestr, datefmt)
 | 
						|
        except ValueError as e:
 | 
						|
            return None, f"could not parse EXIF date of file '{path}': {e}"
 | 
						|
 | 
						|
        # convert to UTC
 | 
						|
        if self.exifTZ is not None:
 | 
						|
            dt += self.exifTZ.utcoffset(dt)
 | 
						|
 | 
						|
        return dt, None
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def fetch(self):
 | 
						|
        headers = {
 | 
						|
            "Connection": "keep-alive",
 | 
						|
            "User-agent": (
 | 
						|
                "Linux / Firefox 26: Mozilla/5.0 "
 | 
						|
                "(X11; Linux x86_64; rv:26.0) "
 | 
						|
                "Gecko/20100101 Firefox/26.0"
 | 
						|
            ),
 | 
						|
        }
 | 
						|
        modSinceHeader = "If-Modified-Since"
 | 
						|
        lastModHeader = "Last-Modified"
 | 
						|
 | 
						|
        logging.info(
 | 
						|
            f"connecting to {'https' if self.ssl else 'http'}://{self.host}:{self.port}"
 | 
						|
        )
 | 
						|
        try:
 | 
						|
            if self.ssl:
 | 
						|
                con = http.client.HTTPSConnection(self.host, self.port)
 | 
						|
            else:
 | 
						|
                con = http.client.HTTPConnection(self.host, self.port)
 | 
						|
        except Exception as e:
 | 
						|
            logging.info(f"connection error: {format(e)}")
 | 
						|
            return False
 | 
						|
 | 
						|
        updated = False
 | 
						|
 | 
						|
        i = 0
 | 
						|
        for sid, info in self.streams.items():
 | 
						|
            # deactivate keep-alive for last request
 | 
						|
            i += 1
 | 
						|
            if i == len(self.streams):
 | 
						|
                headers.pop("Connection")
 | 
						|
 | 
						|
            # request image only if it was modified since the last request
 | 
						|
            lastModStr = None
 | 
						|
            if info.lastMod is None:
 | 
						|
                if modSinceHeader in headers:
 | 
						|
                    headers.pop(modSinceHeader)
 | 
						|
            else:
 | 
						|
                lastModStr = info.lastMod.strftime(self.lastModFmt)
 | 
						|
                headers[modSinceHeader] = lastModStr
 | 
						|
 | 
						|
            # request data
 | 
						|
            url = self.baseURL + info.filename
 | 
						|
            logging.info(f"fetching {sid}:")
 | 
						|
            msg = f"    GET {url}"
 | 
						|
            if lastModStr is not None:
 | 
						|
                msg += " ({modSinceHeader}: {lastModStr})"
 | 
						|
            logging.info(msg)
 | 
						|
 | 
						|
            try:
 | 
						|
                con.request("GET", url, headers=headers)
 | 
						|
            except BaseException:
 | 
						|
                logging.error(f"could not send request: {sys.exc_info()[0]}")
 | 
						|
                break
 | 
						|
            resp = con.getresponse()
 | 
						|
 | 
						|
            logging.info(f"{resp.status} {resp.reason}")
 | 
						|
            if resp.status != 200:
 | 
						|
                resp.read()
 | 
						|
                continue
 | 
						|
 | 
						|
            # write file
 | 
						|
            path = os.path.join(self.baseDir, info.filename)
 | 
						|
            try:
 | 
						|
                with open(path, "wb+") as f:
 | 
						|
                    f.write(resp.read())
 | 
						|
            except OSError as e:
 | 
						|
                logging.error(f"could not write image file '{path}': {e}")
 | 
						|
                continue
 | 
						|
 | 
						|
            info.lastMod = datetime.now(timezone.utc)
 | 
						|
            updated = True
 | 
						|
 | 
						|
            # read modification time from HTTP header
 | 
						|
            lastModStr = resp.getheader(lastModHeader, None)
 | 
						|
            logging.info(
 | 
						|
                f"last modified: {(lastModStr if lastModStr is not None else 'UNKNOWN')}"
 | 
						|
            )
 | 
						|
 | 
						|
            if lastModStr is None:
 | 
						|
                continue
 | 
						|
 | 
						|
            try:
 | 
						|
                info.lastMod = datetime.strptime(lastModStr, self.lastModFmt)
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"error paring last-modified string: {e}")
 | 
						|
                continue
 | 
						|
 | 
						|
            # update file modification time if last-modified header was present
 | 
						|
            try:
 | 
						|
                epoch = datetime(1970, 1, 1)
 | 
						|
                atime = (datetime.now(timezone.utc) - epoch).total_seconds()
 | 
						|
                mtime = (info.lastMod - epoch).total_seconds()
 | 
						|
                os.utime(path, (atime, mtime))
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"could not update file modification time: {e}")
 | 
						|
 | 
						|
        con.close()
 | 
						|
        return updated
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def push(self):
 | 
						|
        output = CAPS.Plugin("imageimporter")
 | 
						|
        output.setHost(self.capsHost)
 | 
						|
        output.setPort(self.capsPort)
 | 
						|
        output.setSSLEnabled(self.capsSSL)
 | 
						|
        output.setBufferSize(1 << 30)
 | 
						|
 | 
						|
        for sid, info in self.streams.items():
 | 
						|
            logging.info(f"pushing {sid}:")
 | 
						|
 | 
						|
            path = os.path.join(self.baseDir, info.filename)
 | 
						|
            if not os.path.isfile(path):
 | 
						|
                logging.error("file not found")
 | 
						|
                continue
 | 
						|
            if info.lastMod is None:
 | 
						|
                logging.error("unknown modification time")
 | 
						|
                continue
 | 
						|
 | 
						|
            samplingTime, msg = self.readEXIFDate(path)
 | 
						|
            if samplingTime is None:
 | 
						|
                logging.warning(f"{msg}")
 | 
						|
                samplingTime = info.lastMod
 | 
						|
 | 
						|
            # check if image was updated
 | 
						|
            if info.lastSent is not None and info.lastSent >= samplingTime:
 | 
						|
                logging.info("no update detected")
 | 
						|
                continue
 | 
						|
 | 
						|
            # determine mime type
 | 
						|
            mtype = mimetypes.guess_type(path)
 | 
						|
            if mtype[0] is None:
 | 
						|
                logging.error(f"{sid}: failed to read MIME type of file: {path}")
 | 
						|
                continue
 | 
						|
            mtype = mtype[0]
 | 
						|
            mtype = mtype[mtype.find("/") + 1 :].upper()
 | 
						|
 | 
						|
            # push image to caps server
 | 
						|
            timestamp = CAPS.Time(
 | 
						|
                samplingTime.year,
 | 
						|
                samplingTime.month,
 | 
						|
                samplingTime.day,
 | 
						|
                samplingTime.hour,
 | 
						|
                samplingTime.minute,
 | 
						|
                samplingTime.second,
 | 
						|
                samplingTime.microsecond,
 | 
						|
            )
 | 
						|
 | 
						|
            try:
 | 
						|
                net, sta, loc, cha = sid.split(".", 3)
 | 
						|
            except BaseException:
 | 
						|
                logging.error("invalid stream id")
 | 
						|
                continue
 | 
						|
 | 
						|
            try:
 | 
						|
                with open(path, "rb") as f:
 | 
						|
                    data = f.read()
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"could not read image file: {e}")
 | 
						|
                continue
 | 
						|
 | 
						|
            try:
 | 
						|
                logging.info(
 | 
						|
                    f"timestamp: {timestamp.iso()}, mime: {mtype}, "
 | 
						|
                    f"size: {len(data)}, data: {type(data)}"
 | 
						|
                )
 | 
						|
                output.push(net, sta, loc, cha, timestamp, 1, 0, mtype, data)
 | 
						|
            except Exception as e:
 | 
						|
                logging.error(f"CAPS communication error: {e}")
 | 
						|
                logging.error(traceback.format_exc())
 | 
						|
                output.close()
 | 
						|
                break
 | 
						|
 | 
						|
        output.close()
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def run(self):
 | 
						|
        # create tmp dir if it does not exist yet
 | 
						|
        if not os.path.isdir(self.baseDir):
 | 
						|
            try:
 | 
						|
                os.makedirs(self.baseDir)
 | 
						|
            except OSError as e:
 | 
						|
                logging.error(f"could not create temporary directory: {format(e)}")
 | 
						|
                return 5
 | 
						|
 | 
						|
        # try to lock file
 | 
						|
        f = None
 | 
						|
        try:
 | 
						|
            f = open(self.lockFile, "w")
 | 
						|
        except IOError as e:
 | 
						|
            logging.error(f"could not open lock file for writing: {format(e)}")
 | 
						|
            return 1
 | 
						|
 | 
						|
        try:
 | 
						|
            fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
 | 
						|
        except IOError:
 | 
						|
            logging.error("could not acquire lock, application already running?")
 | 
						|
            return 2
 | 
						|
 | 
						|
        if not self.streamConfig:
 | 
						|
            logging.error("no streams configured")
 | 
						|
            return 3
 | 
						|
 | 
						|
        self.initStreams()
 | 
						|
 | 
						|
        # initialize time zone if specified
 | 
						|
        if self.exifTZName is not None:
 | 
						|
            try:
 | 
						|
                self.exifTZ = pytz.timezone(self.exifTZName)
 | 
						|
            except pytz.exceptions.UnknownTimeZoneError:
 | 
						|
                logging.error(f"could not retrieve timezone: {self.exifTZName}")
 | 
						|
                return 4
 | 
						|
 | 
						|
        # main loop
 | 
						|
        first = True
 | 
						|
        while True:
 | 
						|
            if self.isExitRequested():
 | 
						|
                break
 | 
						|
            start = datetime.now(timezone.utc)
 | 
						|
 | 
						|
            if self.fetch() or first:
 | 
						|
                self.push()
 | 
						|
                first = False
 | 
						|
 | 
						|
            # schedule next run
 | 
						|
            delta = datetime.now(timezone.utc) - start
 | 
						|
            sleep = self.interval - delta.total_seconds()
 | 
						|
            if sleep <= 0:
 | 
						|
                logging.warning("warning: could not schedule next fetch in time")
 | 
						|
            else:
 | 
						|
                logging.info(f"schedule next fetch in {sleep:.2f} seconds")
 | 
						|
                for _i in range(0, int(sleep)):
 | 
						|
                    time.sleep(1)
 | 
						|
                    if self.isExitRequested():
 | 
						|
                        break
 | 
						|
        return True
 | 
						|
 | 
						|
 | 
						|
# -----------------------------------------------------------------------------
 | 
						|
if __name__ == "__main__":
 | 
						|
    app = WebCam(len(sys.argv), sys.argv)
 | 
						|
    sys.exit(app())
 | 
						|
 | 
						|
 | 
						|
# vim: ts=4 noet
 |