Files
seiscomp-training/bin/webcam2caps

455 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
###############################################################################
# Copyright (C) 2014 by gempa GmbH #
# #
# All Rights Reserved. #
# #
# NOTICE: All information contained herein is, and remains #
# the property of gempa GmbH and its suppliers, if any. The intellectual #
# and technical concepts contained herein are proprietary to gempa GmbH #
# and its suppliers. #
# Dissemination of this information or reproduction of this material #
# is strictly forbidden unless prior written permission is obtained #
# from gempa GmbH. #
# #
# Author: Stephan Herrnkind #
# Email: herrnkind@gempa.de #
# #
# Requests images via HTTP and stores them into CAPS server #
# #
# All images are downloaded at once at regular interval. If available, the #
# file modification time is read from the 'Last-Modififed' HTTP header. The #
# sampling time of the packet sent to CAPS is either set to the time #
# extracted from EXIF infomartion of from the file modification time. #
# Packets are only sent if the current sampling time exceed the previous one. #
# #
###############################################################################
from __future__ import absolute_import, division, print_function
import sys
import os
import fcntl
import time
import traceback
import http.client
import mimetypes
from datetime import datetime, timezone
from dateutil.tz import tzlocal
import pytz
from seiscomp import logging, client
from gempa import CAPS
###############################################################################
class StreamInfo:
# -------------------------------------------------------------------------
def __init__(self, filename, lastMod, lastSent):
self.filename = filename
self.lastMod = lastMod
self.lastSent = lastSent
###############################################################################
class WebCam(client.Application):
# -------------------------------------------------------------------------
def __init__(self, argc, argv):
client.Application.__init__(self, argc, argv)
self.setMessagingEnabled(False)
self.setDatabaseEnabled(False, False)
self.host = "www.geonet.org.nz"
self.port = 443
self.ssl = True
self.baseURL = "/volcano/cameras/latest/"
self.baseDir = "/tmp/webcam"
self.lockFile = os.path.join(self.baseDir, "lock")
self.interval = 60 # fetch interval in seconds
self.capsHost = "localhost"
self.capsPort = 18003
self.capsSSL = False
# name of the time zone of the EXIF date, set to None if EXIF date is
# in UTC, else e.g. 'Pacific/Auckland'
self.exifTZName = None
self.exifTZ = None
self.localTZ = tzlocal()
self.lastModFmt = "%a, %d %b %Y %H:%M:%S GMT"
self.pushCMD = [
"test2caps",
"--read-from",
"",
"--format",
"",
"--id",
"",
"--begin",
"",
"--type",
"ANY",
"--interval",
"0/1",
"--verbosity",
"0",
"-H",
"localhost",
"-p",
"18003",
]
# map stream ids to filenames
self.streams = {}
self.streamConfig = {
"NZ.RAOU..CAM": "raoulisland.jpg",
"NZ.TEKA..CAM": "tekaha.jpg",
"NZ.WHAK..CAM": "whakatane.jpg",
"NZ.TONG..CAM": "tongariro.jpg",
"NZ.TONG.TEMAARI.CAM": "tongarirotemaaricrater.jpg",
"NZ.NGAU..CAM": "ngauruhoe.jpg",
"NZ.RUAP.NORTH.CAM": "ruapehunorth.jpg",
"NZ.RUAP.SOUTH.CAM": "ruapehusouth.jpg",
"NZ.RUAP.RUHOE.CAM": "ruapehungauruhoe.jpg",
"NZ.TARA..CAM": "taranaki.jpg",
}
# -------------------------------------------------------------------------
# add last modification and last sent time to stream map as
# (utc datetime objects)
def initStreams(self):
logging.info("initializing streams:")
for sid, filename in self.streamConfig.items():
path = os.path.join(self.baseDir, filename)
modTime = self.readMTime(path)
self.streams[sid] = StreamInfo(filename, modTime, None)
logging.info(
f"{sid}: file: {filename}, modified: "
f"{'-' if modTime is None else modTime.isoformat()}"
)
# -------------------------------------------------------------------------
# read mtime from file name
# return datetime object converted to UTC
def readMTime(self, path):
# check file existence
if not os.path.isfile(path):
return None
try:
dt = datetime.fromtimestamp(os.path.getmtime(path))
# convert to UTC
if self.localTZ is not None:
dt += self.localTZ.utcoffset(dt)
return dt
except Exception as e:
logging.error(f"could not extract mtime from file '{path}': {e}")
return None
# -------------------------------------------------------------------------
# read date from EXIF header of specified file
# return datetime object converted to UTC
def readEXIFDate(self, path):
datekey = "EXIF DateTimeOriginal"
datefmt = "%Y:%m:%d %H:%M:%S"
# check file existence
if not os.path.isfile(path):
return None, f"no such file: {path}"
try:
import exifread # pylint: disable=C0415,E0401
except BaseException:
return None, "could not import exifread python module"
# read EXIF data
try:
with open(path, "rb") as f:
tags = exifread.process_file(f)
except IOError as e:
return None, f"could not read EXIF header: {e}"
# check for date key
if datekey not in list(tags.keys()):
return (
None,
f"could not find date key '{datekey}' in EXIF header of "
f"file : {path}",
)
# parse date
datestr = str(tags[datekey])
try:
dt = datetime.strptime(datestr, datefmt)
except ValueError as e:
return None, f"could not parse EXIF date of file '{path}': {e}"
# convert to UTC
if self.exifTZ is not None:
dt += self.exifTZ.utcoffset(dt)
return dt, None
# -------------------------------------------------------------------------
def fetch(self):
headers = {
"Connection": "keep-alive",
"User-agent": (
"Linux / Firefox 26: Mozilla/5.0 "
"(X11; Linux x86_64; rv:26.0) "
"Gecko/20100101 Firefox/26.0"
),
}
modSinceHeader = "If-Modified-Since"
lastModHeader = "Last-Modified"
logging.info(
f"connecting to {'https' if self.ssl else 'http'}://{self.host}:{self.port}"
)
try:
if self.ssl:
con = http.client.HTTPSConnection(self.host, self.port)
else:
con = http.client.HTTPConnection(self.host, self.port)
except Exception as e:
logging.info(f"connection error: {format(e)}")
return False
updated = False
i = 0
for sid, info in self.streams.items():
# deactivate keep-alive for last request
i += 1
if i == len(self.streams):
headers.pop("Connection")
# request image only if it was modified since the last request
lastModStr = None
if info.lastMod is None:
if modSinceHeader in headers:
headers.pop(modSinceHeader)
else:
lastModStr = info.lastMod.strftime(self.lastModFmt)
headers[modSinceHeader] = lastModStr
# request data
url = self.baseURL + info.filename
logging.info(f"fetching {sid}:")
msg = f" GET {url}"
if lastModStr is not None:
msg += " ({modSinceHeader}: {lastModStr})"
logging.info(msg)
try:
con.request("GET", url, headers=headers)
except BaseException:
logging.error(f"could not send request: {sys.exc_info()[0]}")
break
resp = con.getresponse()
logging.info(f"{resp.status} {resp.reason}")
if resp.status != 200:
resp.read()
continue
# write file
path = os.path.join(self.baseDir, info.filename)
try:
with open(path, "wb+") as f:
f.write(resp.read())
except OSError as e:
logging.error(f"could not write image file '{path}': {e}")
continue
info.lastMod = datetime.now(timezone.utc)
updated = True
# read modification time from HTTP header
lastModStr = resp.getheader(lastModHeader, None)
logging.info(
f"last modified: {(lastModStr if lastModStr is not None else 'UNKNOWN')}"
)
if lastModStr is None:
continue
try:
info.lastMod = datetime.strptime(lastModStr, self.lastModFmt)
except Exception as e:
logging.error(f"error paring last-modified string: {e}")
continue
# update file modification time if last-modified header was present
try:
epoch = datetime(1970, 1, 1)
atime = (datetime.now(timezone.utc) - epoch).total_seconds()
mtime = (info.lastMod - epoch).total_seconds()
os.utime(path, (atime, mtime))
except Exception as e:
logging.error(f"could not update file modification time: {e}")
con.close()
return updated
# -------------------------------------------------------------------------
def push(self):
output = CAPS.Plugin("imageimporter")
output.setHost(self.capsHost)
output.setPort(self.capsPort)
output.setSSLEnabled(self.capsSSL)
output.setBufferSize(1 << 30)
for sid, info in self.streams.items():
logging.info(f"pushing {sid}:")
path = os.path.join(self.baseDir, info.filename)
if not os.path.isfile(path):
logging.error("file not found")
continue
if info.lastMod is None:
logging.error("unknown modification time")
continue
samplingTime, msg = self.readEXIFDate(path)
if samplingTime is None:
logging.warning(f"{msg}")
samplingTime = info.lastMod
# check if image was updated
if info.lastSent is not None and info.lastSent >= samplingTime:
logging.info("no update detected")
continue
# determine mime type
mtype = mimetypes.guess_type(path)
if mtype[0] is None:
logging.error(f"{sid}: failed to read MIME type of file: {path}")
continue
mtype = mtype[0]
mtype = mtype[mtype.find("/") + 1 :].upper()
# push image to caps server
timestamp = CAPS.Time(
samplingTime.year,
samplingTime.month,
samplingTime.day,
samplingTime.hour,
samplingTime.minute,
samplingTime.second,
samplingTime.microsecond,
)
try:
net, sta, loc, cha = sid.split(".", 3)
except BaseException:
logging.error("invalid stream id")
continue
try:
with open(path, "rb") as f:
data = f.read()
except Exception as e:
logging.error(f"could not read image file: {e}")
continue
try:
logging.info(
f"timestamp: {timestamp.iso()}, mime: {mtype}, "
f"size: {len(data)}, data: {type(data)}"
)
output.push(net, sta, loc, cha, timestamp, 1, 0, mtype, data)
except Exception as e:
logging.error(f"CAPS communication error: {e}")
logging.error(traceback.format_exc())
output.close()
break
output.close()
# -------------------------------------------------------------------------
def run(self):
# create tmp dir if it does not exist yet
if not os.path.isdir(self.baseDir):
try:
os.makedirs(self.baseDir)
except OSError as e:
logging.error(f"could not create temporary directory: {format(e)}")
return 5
# try to lock file
f = None
try:
f = open(self.lockFile, "w")
except IOError as e:
logging.error(f"could not open lock file for writing: {format(e)}")
return 1
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
logging.error("could not acquire lock, application already running?")
return 2
if not self.streamConfig:
logging.error("no streams configured")
return 3
self.initStreams()
# initialize time zone if specified
if self.exifTZName is not None:
try:
self.exifTZ = pytz.timezone(self.exifTZName)
except pytz.exceptions.UnknownTimeZoneError:
logging.error(f"could not retrieve timezone: {self.exifTZName}")
return 4
# main loop
first = True
while True:
if self.isExitRequested():
break
start = datetime.now(timezone.utc)
if self.fetch() or first:
self.push()
first = False
# schedule next run
delta = datetime.now(timezone.utc) - start
sleep = self.interval - delta.total_seconds()
if sleep <= 0:
logging.warning("warning: could not schedule next fetch in time")
else:
logging.info(f"schedule next fetch in {sleep:.2f} seconds")
for _i in range(0, int(sleep)):
time.sleep(1)
if self.isExitRequested():
break
return True
# -----------------------------------------------------------------------------
if __name__ == "__main__":
app = WebCam(len(sys.argv), sys.argv)
sys.exit(app())
# vim: ts=4 noet