#!/usr/bin/env seiscomp-python ############################################################################### # Copyright (C) 2024 by gempa GmbH # # # # All Rights Reserved. # # # # NOTICE: All information contained herein is, and remains # # the property of gempa GmbH and its suppliers, if any. The intellectual # # and technical concepts contained herein are proprietary to gempa GmbH # # and its suppliers. # # Dissemination of this information or reproduction of this material # # is strictly forbidden unless prior written permission is obtained # # from gempa GmbH. # ############################################################################### import os import sys import tempfile import time import urllib.parse import requests from bs4 import BeautifulSoup from datetime import datetime from osgeo import gdal from seiscomp import client, logging, system from gempa import CAPS from licsar2caps.journal import Journal, JournalItem from licsar2caps.streammap import StreamMap from licsar2caps import utils def needsUpdate(item, startTime, endTime): if not item.startTime: return True if startTime == item.startTime and endTime > item.endTime: return True if startTime > item.startTime: return True return False class Filter: def __init__(self): self.threshold = None self.enabled = False self.percentile = 99.9 ############################################################################### class App(client.Application): # ------------------------------------------------------------------------- def __init__(self, argc, argv): client.Application.__init__(self, argc, argv) self.setDatabaseEnabled(False, False) self.setMessagingEnabled(False) self.baseUrl = "https://gws-access.jasmin.ac.uk/public/nceo_geohazards/LiCSAR_products" self.journalFile = "@ROOTDIR@/var/run/" + self.name() + "/journal" self.pollInterval = 60 self.networks = None self.dump = False self.outputAddr = "caps://localhost:18003" self.capsLog = False self.output = None self.journal = Journal() self.print = False self.products = {} self.outputDirectory = None self.strStartTime = None self.reprocess = False self.processLatestOnly = False self.filter = Filter() dt = datetime.utcnow() self.startTime = CAPS.Time() self.startTime.set(dt.year, dt.month, dt.day, 0, 0, 0, 0) self.streamsFile = None # ------------------------------------------------------------------------- def initConfiguration(self): if not client.Application.initConfiguration(self): return False param = "" try: param = "input.baseUrl" try: self.baseUrl = self.configGetString(param) except RuntimeError: pass param = "input.streamsFile" try: self.streamsFile = self.configGetString(param) except RuntimeError: pass param = "input.startTime" try: self.strStartTime = self.configGetString(param) except RuntimeError: pass param = "input.processLatestOnly" try: self.processLatestOnly = self.configGetBool(param) except RuntimeError: pass param = "input.reprocess" try: self.reprocess = self.configGetBool(param) except RuntimeError: pass param = "input.pollInterval" try: self.pollInterval = self.configGetInt(param) except RuntimeError: pass param = "input.products" try: for item in self.configGetStrings(param): try: key, value = item.split(":") if not key or not value: logging.error( f"{param}: " "Key and value must not be empty" ) return False self.products[key.strip()] = value.strip() except ValueError: logging.error( f"{param}: Invalid entry: Expected: [KEY:VALUE]" ) return False except RuntimeError: self.products = {"geo.unw.tif": "UNW"} pass param = "input.filter.enabled" try: self.filter.enabled = self.configGetBool(param) except RuntimeError: pass param = "input.filter.threshold" try: self.filter.threshold = self.configGetDouble(param) param = "input.filter.percentile" except RuntimeError: pass param = "input.filter.percentile" try: self.filter.percentile = self.configGetDouble(param) except RuntimeError: pass param = "output.addr" try: self.outputAddr = self.configGetString(param) except RuntimeError: pass param = "output.directory" try: self.outputDirectory = self.configGetString(param) except RuntimeError: pass param = "journal.file" try: self.journalFile = self.configGetString(param) except RuntimeError: pass except Exception as e: logging.error(f"Invalid parameter {param}: {e}") return False return True # ------------------------------------------------------------------------- def createCommandLineDescription(self): client.Application.createCommandLineDescription(self) self.commandline().addGroup("Input") self.commandline().addStringOption( "Input", "base-url", f"Base URL from which data is received (default={self.baseUrl})", ) self.commandline().addStringOption( "Input", "interval,i", "Poll mode interval in seconds (default={self.pollInterval})", ) self.commandline().addOption( "Input", "reprocess", "Force reprocessing of the last received grid at start" f"(default={self.reprocess})", ) self.commandline().addGroup("Output") self.commandline().addStringOption( "Output", "addr,a", "Data output address [[caps|capss]://][user:pass@]host[:port]", ) self.commandline().addStringOption( "Output", "dir,d", "Output directory. Write grid files to this directory instead " " of sending it to CAPS.", ) self.commandline().addBoolOption( "Output", "caps-log", f"Enable CAPS logging (default={self.capsLog})", ) self.commandline().addOption( "Output", "print-packets", "Print packets" ) self.commandline().addGroup("Journal") self.commandline().addStringOption( "Journal", "journal,j", "File to store stream states. Use an " "empty string to log to standard out" "[[caps|capss]://][user:pass@]host[:port]", ) # ------------------------------------------------------------------------- def printUsage(self): print( """ licsar2caps: Import licsar data from web page to CAPS. """ ) client.StreamApplication.printUsage(self) print("Examples") print("Processing with informative debug output.") print(f" {self.name()} --debug") print("Write output grids to directory") print(f" {self.name()} -d /tmp") # ------------------------------------------------------------------------- def getProduct(self, fullUrl, streamID, filename): logging.info(f" + {streamID}: Downloading data product {filename}") try: res = requests.get(fullUrl, timeout=10, allow_redirects=True) except Exception: logging.info( f"+ {streamID}: Download failed. Read operation timed out. " f"URL: {fullUrl}" ) return None if res.status_code != 200: logging.info( f" + {streamID}: Download failed. HTTP status code " f"{res.status_code}: {res.reason}. URL: {fullUrl}" ) return None logging.info(f" + {streamID}: Downloaded data product {filename}") with tempfile.NamedTemporaryFile() as tmp: tmp.write(res.content) tmp.flush() try: filename = "/vsimem/in_memory_output.grd" ds = gdal.Translate( filename, tmp.name, format="GSBG", outputType=gdal.GDT_Float32, ) if not ds: logging.info( f" + {streamID}: Could not convert data product " f"{filename} to Surfer6 format" ) return None if self.filter.enabled: value = utils.calculateAbsPerc( ds.GetRasterBand(1).ReadAsArray(), self.filter.percentile, ) if value <= self.filter.threshold: logging.info( f" + {streamID}: Computed grid displacement is " "less or equal the configured threshold " f"{value} <= {self.filter.threshold}. Skipping " "grid" ) return None f = gdal.VSIFOpenL(filename, "rb") gdal.VSIFSeekL(f, 0, 2) filesize = gdal.VSIFTellL(f) gdal.VSIFSeekL(f, 0, 0) content = gdal.VSIFReadL(1, filesize, f) gdal.VSIFCloseL(f) gdal.Unlink("/vsimem/in_memory_output.grd") data = content return data except AttributeError: # Fallback for RHEL 7 import subprocess with tempfile.NamedTemporaryFile() as outFile: filename = outFile.name subprocess.call( ["gdal_translate", "-of", "GSBG", tmp.name, filename] ) with open(filename, "rb") as f: return f.read() return None return None # ------------------------------------------------------------------------- def storeProduct(self, data, streamID, item): if not os.path.exists(self.outputDirectory): try: os.makedirs(self.outputDirectory) except OSError as err: logging.error( "Could not create output directory " f"'{self.outputDirectory}': {err}" ) return False filename = os.path.join(self.outputDirectory, streamID + ".grd") try: with open(filename, "wb") as f: f.write(data) except IOError as err: logging.error(f"Failed to write data to file {filename}: {err}") return False logging.info(f" + {streamID}: Stored data in '{filename}'") return True # ------------------------------------------------------------------------- def sendProduct(self, data, item, channelCode, startTime, endTime): streamID = item.stationID + "." + channelCode if self.print: print( f"{streamID} - {startTime.iso()} ~ " f"{endTime.iso()} Size: {len(data)} bytes" ) return True ret = self.output.pushAny( item.networkCode, item.stationCode, item.locationCode, channelCode, startTime, endTime, 1, 0, "grd", "N/A", bytes(data), ) if ret != CAPS.Plugin.Success: logging.error( f"{streamID} - Data {startTime.iso()} ~ {endTime.iso()} " "could not be sent to CAPS: Error code: {ret}" ) return False logging.info( f" + {streamID}: Sent packet {startTime.iso()} ~ " f"{endTime.iso()} {len(data)} bytes" ) return True # ------------------------------------------------------------------------- def done(self): if self.output: self.output.close() self.journal.write(self.journalFile) client.Application.done(self) # ------------------------------------------------------------------------- def init(self): if not client.Application.init(self): return False if os.path.isfile(self.journalFile) and not self.journal.read( self.journalFile ): return False self.streamMap = StreamMap() if not self.streamMap.read(self.streamsFile): return False for key, item in self.streamMap.items.items(): j = self.journal.get(key) if j: item.startTime = j.startTime item.endTime = j.endTime else: if self.startTime: item.startTime = self.startTime item.endTime = self.startTime url = urllib.parse.urlparse(self.outputAddr) if not url: logging.error("Could not parse data output address") return False self.output = CAPS.Plugin("licsar2caps") self.output.setHost(url.hostname) self.output.setPort(url.port) self.output.setBufferSize(1 << 29) if self.capsLog: self.output.enablelogging() protocol = url.scheme.lower() if protocol == "capss": self.output.setSSLEnabled(True) return True # ------------------------------------------------------------------------- def run(self): if self.outputDirectory: outputStr = f"\n Output Directory: {self.outputDirectory}" else: outputStr = f"\n Output address : {self.outputAddr}" logging.info( "\nConfiguration:" f"\n Base URL : {self.baseUrl}" f"\n Poll interval : {self.pollInterval} s" f"\n Streams : {self.streamsFile}" f"\n Journal : {self.journalFile}" f"{outputStr}" f"\n Start time : {self.startTime.iso()}" f"\n Products : {self.products}" f"\n Filter : {'Enabled' if self.filter.enabled else 'Disabled'}" f"\n Threshold : {self.filter.threshold}" f"\n Percentile : {self.filter.percentile}" f"\n Force update : {self.reprocess}" f"\n Process latest : {self.processLatestOnly}" ) self.runWatch() return True def getEpochs(self, url, item): url = url + "/interferograms/" logging.info(f" + Checking station {item.stationID}") res = requests.get(url, timeout=10, allow_redirects=True) if res.status_code != 200: logging.error(f"HTTP status code {res.status_code}: {res.reason}") logging.info(" + End") return None soup = BeautifulSoup(res.text, "html.parser") data = [ node.get("href").replace("/", "") for node in soup.find_all("a") if node.get("href").endswith("/") and node.text != "Parent Directory" ] logging.info(f" + Found {len(data)} epochs") epochs = [] for epoch in data: try: start, end = epoch.split("_") except Exception: logging.error( f"{item.stationID}: Invalid epoch {epoch}: " "Expected [START_END]" ) continue startTime = CAPS.Time.FromString(start, "%Y%m%d") if not startTime.valid(): logging.error(f"{item.stationID}: Invalid start time {start}") continue endTime = CAPS.Time.FromString(end, "%Y%m%d") if not endTime.valid(): logging.error(f"{item.stationID}: Invalid end time {end}") continue if needsUpdate(item, startTime, endTime): epochs.append([epoch, startTime, endTime]) elif self.reprocess: if item.startTime == startTime and item.endTime == endTime: epochs.append([epoch, startTime, endTime]) if not epochs: logging.info(" + No new data available. Nothing todo") else: logging.info(f" + {len(epochs)} epoch(s) must be processed") logging.info(" + End") return epochs # ------------------------------------------------------------------------- def runWatch(self): while not self.isExitRequested(): logging.info("Looking for new data") for stationID, item in self.streamMap.items.items(): if self.isExitRequested(): break url = self.baseUrl + "/" + item.baseCode + "/" + item.folder epochs = self.getEpochs(url, item) if not epochs: continue if self.processLatestOnly: epochs = [epochs[-1]] for epoch, startTime, endTime in epochs: if self.isExitRequested(): break for k, cha in self.products.items(): streamID = ( item.networkCode + "." + item.stationCode + "." + item.locationCode + "." + cha ) filename = epoch + "." + k fullUrl = ( url + "/interferograms/" + epoch + "/" + filename ) data = self.getProduct(fullUrl, streamID, filename) if data: if self.outputDirectory: self.storeProduct(data, streamID, item) else: self.sendProduct( data, item, cha, startTime, endTime ) if self.isExitRequested(): break # Update start and end time of the station item.startTime = startTime item.endTime = endTime # Update Journal self.journal.items[item.stationID] = JournalItem( startTime, endTime ) if self.isExitRequested(): break # Do reprocessing only once if self.reprocess: self.reprocess = False logging.info("End") logging.info(f"Next run in {self.pollInterval} seconds") self.wait(self.pollInterval) # ------------------------------------------------------------------------- def wait(self, seconds): for _i in range(0, seconds): time.sleep(1) if self.isExitRequested(): break # ------------------------------------------------------------------------- def validateParameters(self): if not client.Application.validateParameters(self): return False self.dump = self.commandline().hasOption("dump") if self.dump: self.setMessagingEnabled(False) try: self.baseUrl = self.commandline().optionString("base-url") except Exception: pass try: self.strStartTime = self.commandline().optionString("start-time") except Exception: pass if self.strStartTime: self.startTime = utils.parseTime(self.strStartTime) if not self.startTime.valid(): logging.error(f"Invalid start time '{self.strStartTime}") return False try: self.journalFile = self.commandline().optionString("journal") except Exception: pass try: self.pollInterval = int( self.commandline().optionString("interval") ) except Exception: pass try: self.outputAddr = self.commandline().optionString("addr") except Exception: pass try: self.outputDirectory = self.commandline().optionString("dir") except Exception: pass try: self.journalFile = self.commandline().optionString("journal") except Exception: pass if self.journalFile: self.journalFile = system.Environment.Instance().absolutePath( self.journalFile ) if not self.streamsFile: logging.error("Option 'input.streamsFile' is mandatory") return False self.streamsFile = system.Environment.Instance().absolutePath( self.streamsFile ) if self.outputDirectory: self.outputDirectory = system.Environment.Instance().absolutePath( self.outputDirectory ) if self.commandline().hasOption("reprocess"): self.reprocess = True self.print = self.commandline().hasOption("print-packets") return True # ----------------------------------------------------------------------------- app = App(len(sys.argv), sys.argv) sys.exit(app())