Files
seiscomp-training/bin/licsar2caps

698 lines
23 KiB
Plaintext
Executable File

#!/usr/bin/env seiscomp-python
###############################################################################
# Copyright (C) 2024 by gempa GmbH #
# #
# All Rights Reserved. #
# #
# NOTICE: All information contained herein is, and remains #
# the property of gempa GmbH and its suppliers, if any. The intellectual #
# and technical concepts contained herein are proprietary to gempa GmbH #
# and its suppliers. #
# Dissemination of this information or reproduction of this material #
# is strictly forbidden unless prior written permission is obtained #
# from gempa GmbH. #
###############################################################################
import os
import sys
import tempfile
import time
import urllib.parse
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from osgeo import gdal
from seiscomp import client, logging, system
from gempa import CAPS
from licsar2caps.journal import Journal, JournalItem
from licsar2caps.streammap import StreamMap
from licsar2caps import utils
def needsUpdate(item, startTime, endTime):
if not item.startTime:
return True
if startTime == item.startTime and endTime > item.endTime:
return True
if startTime > item.startTime:
return True
return False
class Filter:
def __init__(self):
self.threshold = None
self.enabled = False
self.percentile = 99.9
###############################################################################
class App(client.Application):
# -------------------------------------------------------------------------
def __init__(self, argc, argv):
client.Application.__init__(self, argc, argv)
self.setDatabaseEnabled(False, False)
self.setMessagingEnabled(False)
self.baseUrl = "https://gws-access.jasmin.ac.uk/public/nceo_geohazards/LiCSAR_products"
self.journalFile = "@ROOTDIR@/var/run/" + self.name() + "/journal"
self.pollInterval = 60
self.networks = None
self.dump = False
self.outputAddr = "caps://localhost:18003"
self.capsLog = False
self.output = None
self.journal = Journal()
self.print = False
self.products = {}
self.outputDirectory = None
self.strStartTime = None
self.reprocess = False
self.processLatestOnly = False
self.filter = Filter()
dt = datetime.utcnow()
self.startTime = CAPS.Time()
self.startTime.set(dt.year, dt.month, dt.day, 0, 0, 0, 0)
self.streamsFile = None
# -------------------------------------------------------------------------
def initConfiguration(self):
if not client.Application.initConfiguration(self):
return False
param = ""
try:
param = "input.baseUrl"
try:
self.baseUrl = self.configGetString(param)
except RuntimeError:
pass
param = "input.streamsFile"
try:
self.streamsFile = self.configGetString(param)
except RuntimeError:
pass
param = "input.startTime"
try:
self.strStartTime = self.configGetString(param)
except RuntimeError:
pass
param = "input.processLatestOnly"
try:
self.processLatestOnly = self.configGetBool(param)
except RuntimeError:
pass
param = "input.reprocess"
try:
self.reprocess = self.configGetBool(param)
except RuntimeError:
pass
param = "input.pollInterval"
try:
self.pollInterval = self.configGetInt(param)
except RuntimeError:
pass
param = "input.products"
try:
for item in self.configGetStrings(param):
try:
key, value = item.split(":")
if not key or not value:
logging.error(
f"{param}: " "Key and value must not be empty"
)
return False
self.products[key.strip()] = value.strip()
except ValueError:
logging.error(
f"{param}: Invalid entry: Expected: [KEY:VALUE]"
)
return False
except RuntimeError:
self.products = {"geo.unw.tif": "UNW"}
pass
param = "input.filter.enabled"
try:
self.filter.enabled = self.configGetBool(param)
except RuntimeError:
pass
param = "input.filter.threshold"
try:
self.filter.threshold = self.configGetDouble(param)
param = "input.filter.percentile"
except RuntimeError:
pass
param = "input.filter.percentile"
try:
self.filter.percentile = self.configGetDouble(param)
except RuntimeError:
pass
param = "output.addr"
try:
self.outputAddr = self.configGetString(param)
except RuntimeError:
pass
param = "output.directory"
try:
self.outputDirectory = self.configGetString(param)
except RuntimeError:
pass
param = "journal.file"
try:
self.journalFile = self.configGetString(param)
except RuntimeError:
pass
except Exception as e:
logging.error(f"Invalid parameter {param}: {e}")
return False
return True
# -------------------------------------------------------------------------
def createCommandLineDescription(self):
client.Application.createCommandLineDescription(self)
self.commandline().addGroup("Input")
self.commandline().addStringOption(
"Input",
"base-url",
f"Base URL from which data is received (default={self.baseUrl})",
)
self.commandline().addStringOption(
"Input",
"interval,i",
"Poll mode interval in seconds (default={self.pollInterval})",
)
self.commandline().addOption(
"Input",
"reprocess",
"Force reprocessing of the last received grid at start"
f"(default={self.reprocess})",
)
self.commandline().addGroup("Output")
self.commandline().addStringOption(
"Output",
"addr,a",
"Data output address [[caps|capss]://][user:pass@]host[:port]",
)
self.commandline().addStringOption(
"Output",
"dir,d",
"Output directory. Write grid files to this directory instead "
" of sending it to CAPS.",
)
self.commandline().addBoolOption(
"Output",
"caps-log",
f"Enable CAPS logging (default={self.capsLog})",
)
self.commandline().addOption(
"Output", "print-packets", "Print packets"
)
self.commandline().addGroup("Journal")
self.commandline().addStringOption(
"Journal",
"journal,j",
"File to store stream states. Use an "
"empty string to log to standard out"
"[[caps|capss]://][user:pass@]host[:port]",
)
# -------------------------------------------------------------------------
def printUsage(self):
print(
"""
licsar2caps: Import licsar data from web page to CAPS.
"""
)
client.StreamApplication.printUsage(self)
print("Examples")
print("Processing with informative debug output.")
print(f" {self.name()} --debug")
print("Write output grids to directory")
print(f" {self.name()} -d /tmp")
# -------------------------------------------------------------------------
def getProduct(self, fullUrl, streamID, filename):
logging.info(f" + {streamID}: Downloading data product {filename}")
try:
res = requests.get(fullUrl, timeout=10, allow_redirects=True)
except Exception:
logging.info(
f"+ {streamID}: Download failed. Read operation timed out. "
f"URL: {fullUrl}"
)
return None
if res.status_code != 200:
logging.info(
f" + {streamID}: Download failed. HTTP status code "
f"{res.status_code}: {res.reason}. URL: {fullUrl}"
)
return None
logging.info(f" + {streamID}: Downloaded data product {filename}")
with tempfile.NamedTemporaryFile() as tmp:
tmp.write(res.content)
tmp.flush()
try:
filename = "/vsimem/in_memory_output.grd"
ds = gdal.Translate(
filename,
tmp.name,
format="GSBG",
outputType=gdal.GDT_Float32,
)
if not ds:
logging.info(
f" + {streamID}: Could not convert data product "
f"{filename} to Surfer6 format"
)
return None
if self.filter.enabled:
value = utils.calculateAbsPerc(
ds.GetRasterBand(1).ReadAsArray(),
self.filter.percentile,
)
if value <= self.filter.threshold:
logging.info(
f" + {streamID}: Computed grid displacement is "
"less or equal the configured threshold "
f"{value} <= {self.filter.threshold}. Skipping "
"grid"
)
return None
f = gdal.VSIFOpenL(filename, "rb")
gdal.VSIFSeekL(f, 0, 2)
filesize = gdal.VSIFTellL(f)
gdal.VSIFSeekL(f, 0, 0)
content = gdal.VSIFReadL(1, filesize, f)
gdal.VSIFCloseL(f)
gdal.Unlink("/vsimem/in_memory_output.grd")
data = content
return data
except AttributeError:
# Fallback for RHEL 7
import subprocess
with tempfile.NamedTemporaryFile() as outFile:
filename = outFile.name
subprocess.call(
["gdal_translate", "-of", "GSBG", tmp.name, filename]
)
with open(filename, "rb") as f:
return f.read()
return None
return None
# -------------------------------------------------------------------------
def storeProduct(self, data, streamID, item):
if not os.path.exists(self.outputDirectory):
try:
os.makedirs(self.outputDirectory)
except OSError as err:
logging.error(
"Could not create output directory "
f"'{self.outputDirectory}': {err}"
)
return False
filename = os.path.join(self.outputDirectory, streamID + ".grd")
try:
with open(filename, "wb") as f:
f.write(data)
except IOError as err:
logging.error(f"Failed to write data to file {filename}: {err}")
return False
logging.info(f" + {streamID}: Stored data in '{filename}'")
return True
# -------------------------------------------------------------------------
def sendProduct(self, data, item, channelCode, startTime, endTime):
streamID = item.stationID + "." + channelCode
if self.print:
print(
f"{streamID} - {startTime.iso()} ~ "
f"{endTime.iso()} Size: {len(data)} bytes"
)
return True
ret = self.output.pushAny(
item.networkCode,
item.stationCode,
item.locationCode,
channelCode,
startTime,
endTime,
1,
0,
"grd",
"N/A",
bytes(data),
)
if ret != CAPS.Plugin.Success:
logging.error(
f"{streamID} - Data {startTime.iso()} ~ {endTime.iso()} "
"could not be sent to CAPS: Error code: {ret}"
)
return False
logging.info(
f" + {streamID}: Sent packet {startTime.iso()} ~ "
f"{endTime.iso()} {len(data)} bytes"
)
return True
# -------------------------------------------------------------------------
def done(self):
if self.output:
self.output.close()
self.journal.write(self.journalFile)
client.Application.done(self)
# -------------------------------------------------------------------------
def init(self):
if not client.Application.init(self):
return False
if os.path.isfile(self.journalFile) and not self.journal.read(
self.journalFile
):
return False
self.streamMap = StreamMap()
if not self.streamMap.read(self.streamsFile):
return False
for key, item in self.streamMap.items.items():
j = self.journal.get(key)
if j:
item.startTime = j.startTime
item.endTime = j.endTime
else:
if self.startTime:
item.startTime = self.startTime
item.endTime = self.startTime
url = urllib.parse.urlparse(self.outputAddr)
if not url:
logging.error("Could not parse data output address")
return False
self.output = CAPS.Plugin("licsar2caps")
self.output.setHost(url.hostname)
self.output.setPort(url.port)
self.output.setBufferSize(1 << 29)
if self.capsLog:
self.output.enablelogging()
protocol = url.scheme.lower()
if protocol == "capss":
self.output.setSSLEnabled(True)
return True
# -------------------------------------------------------------------------
def run(self):
if self.outputDirectory:
outputStr = f"\n Output Directory: {self.outputDirectory}"
else:
outputStr = f"\n Output address : {self.outputAddr}"
logging.info(
"\nConfiguration:"
f"\n Base URL : {self.baseUrl}"
f"\n Poll interval : {self.pollInterval} s"
f"\n Streams : {self.streamsFile}"
f"\n Journal : {self.journalFile}"
f"{outputStr}"
f"\n Start time : {self.startTime.iso()}"
f"\n Products : {self.products}"
f"\n Filter : {'Enabled' if self.filter.enabled else 'Disabled'}"
f"\n Threshold : {self.filter.threshold}"
f"\n Percentile : {self.filter.percentile}"
f"\n Force update : {self.reprocess}"
f"\n Process latest : {self.processLatestOnly}"
)
self.runWatch()
return True
def getEpochs(self, url, item):
url = url + "/interferograms/"
logging.info(f" + Checking station {item.stationID}")
res = requests.get(url, timeout=10, allow_redirects=True)
if res.status_code != 200:
logging.error(f"HTTP status code {res.status_code}: {res.reason}")
logging.info(" + End")
return None
soup = BeautifulSoup(res.text, "html.parser")
data = [
node.get("href").replace("/", "")
for node in soup.find_all("a")
if node.get("href").endswith("/")
and node.text != "Parent Directory"
]
logging.info(f" + Found {len(data)} epochs")
epochs = []
for epoch in data:
try:
start, end = epoch.split("_")
except Exception:
logging.error(
f"{item.stationID}: Invalid epoch {epoch}: "
"Expected [START_END]"
)
continue
startTime = CAPS.Time.FromString(start, "%Y%m%d")
if not startTime.valid():
logging.error(f"{item.stationID}: Invalid start time {start}")
continue
endTime = CAPS.Time.FromString(end, "%Y%m%d")
if not endTime.valid():
logging.error(f"{item.stationID}: Invalid end time {end}")
continue
if needsUpdate(item, startTime, endTime):
epochs.append([epoch, startTime, endTime])
elif self.reprocess:
if item.startTime == startTime and item.endTime == endTime:
epochs.append([epoch, startTime, endTime])
if not epochs:
logging.info(" + No new data available. Nothing todo")
else:
logging.info(f" + {len(epochs)} epoch(s) must be processed")
logging.info(" + End")
return epochs
# -------------------------------------------------------------------------
def runWatch(self):
while not self.isExitRequested():
logging.info("Looking for new data")
for stationID, item in self.streamMap.items.items():
if self.isExitRequested():
break
url = self.baseUrl + "/" + item.baseCode + "/" + item.folder
epochs = self.getEpochs(url, item)
if not epochs:
continue
if self.processLatestOnly:
epochs = [epochs[-1]]
for epoch, startTime, endTime in epochs:
if self.isExitRequested():
break
for k, cha in self.products.items():
streamID = (
item.networkCode
+ "."
+ item.stationCode
+ "."
+ item.locationCode
+ "."
+ cha
)
filename = epoch + "." + k
fullUrl = (
url + "/interferograms/" + epoch + "/" + filename
)
data = self.getProduct(fullUrl, streamID, filename)
if data:
if self.outputDirectory:
self.storeProduct(data, streamID, item)
else:
self.sendProduct(
data, item, cha, startTime, endTime
)
if self.isExitRequested():
break
# Update start and end time of the station
item.startTime = startTime
item.endTime = endTime
# Update Journal
self.journal.items[item.stationID] = JournalItem(
startTime, endTime
)
if self.isExitRequested():
break
# Do reprocessing only once
if self.reprocess:
self.reprocess = False
logging.info("End")
logging.info(f"Next run in {self.pollInterval} seconds")
self.wait(self.pollInterval)
# -------------------------------------------------------------------------
def wait(self, seconds):
for _i in range(0, seconds):
time.sleep(1)
if self.isExitRequested():
break
# -------------------------------------------------------------------------
def validateParameters(self):
if not client.Application.validateParameters(self):
return False
self.dump = self.commandline().hasOption("dump")
if self.dump:
self.setMessagingEnabled(False)
try:
self.baseUrl = self.commandline().optionString("base-url")
except Exception:
pass
try:
self.strStartTime = self.commandline().optionString("start-time")
except Exception:
pass
if self.strStartTime:
self.startTime = utils.parseTime(self.strStartTime)
if not self.startTime.valid():
logging.error(f"Invalid start time '{self.strStartTime}")
return False
try:
self.journalFile = self.commandline().optionString("journal")
except Exception:
pass
try:
self.pollInterval = int(
self.commandline().optionString("interval")
)
except Exception:
pass
try:
self.outputAddr = self.commandline().optionString("addr")
except Exception:
pass
try:
self.outputDirectory = self.commandline().optionString("dir")
except Exception:
pass
try:
self.journalFile = self.commandline().optionString("journal")
except Exception:
pass
if self.journalFile:
self.journalFile = system.Environment.Instance().absolutePath(
self.journalFile
)
if not self.streamsFile:
logging.error("Option 'input.streamsFile' is mandatory")
return False
self.streamsFile = system.Environment.Instance().absolutePath(
self.streamsFile
)
if self.outputDirectory:
self.outputDirectory = system.Environment.Instance().absolutePath(
self.outputDirectory
)
if self.commandline().hasOption("reprocess"):
self.reprocess = True
self.print = self.commandline().hasOption("print-packets")
return True
# -----------------------------------------------------------------------------
app = App(len(sys.argv), sys.argv)
sys.exit(app())