698 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Plaintext
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			698 lines
		
	
	
		
			23 KiB
		
	
	
	
		
			Plaintext
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env seiscomp-python
 | 
						|
 | 
						|
###############################################################################
 | 
						|
# Copyright (C) 2024 by gempa GmbH                                            #
 | 
						|
#                                                                             #
 | 
						|
# All Rights Reserved.                                                        #
 | 
						|
#                                                                             #
 | 
						|
# NOTICE: All information contained herein is, and remains                    #
 | 
						|
# the property of gempa GmbH and its suppliers, if any. The intellectual      #
 | 
						|
# and technical concepts contained herein are proprietary to gempa GmbH       #
 | 
						|
# and its suppliers.                                                          #
 | 
						|
# Dissemination of this information or reproduction of this material          #
 | 
						|
# is strictly forbidden unless prior written permission is obtained           #
 | 
						|
# from gempa GmbH.                                                            #
 | 
						|
###############################################################################
 | 
						|
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
import time
 | 
						|
 | 
						|
import urllib.parse
 | 
						|
 | 
						|
import requests
 | 
						|
 | 
						|
 | 
						|
from bs4 import BeautifulSoup
 | 
						|
from datetime import datetime
 | 
						|
from osgeo import gdal
 | 
						|
 | 
						|
from seiscomp import client, logging, system
 | 
						|
 | 
						|
from gempa import CAPS
 | 
						|
 | 
						|
from licsar2caps.journal import Journal, JournalItem
 | 
						|
from licsar2caps.streammap import StreamMap
 | 
						|
from licsar2caps import utils
 | 
						|
 | 
						|
 | 
						|
def needsUpdate(item, startTime, endTime):
 | 
						|
    if not item.startTime:
 | 
						|
        return True
 | 
						|
 | 
						|
    if startTime == item.startTime and endTime > item.endTime:
 | 
						|
        return True
 | 
						|
    if startTime > item.startTime:
 | 
						|
        return True
 | 
						|
 | 
						|
    return False
 | 
						|
 | 
						|
 | 
						|
class Filter:
 | 
						|
    def __init__(self):
 | 
						|
        self.threshold = None
 | 
						|
        self.enabled = False
 | 
						|
        self.percentile = 99.9
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class App(client.Application):
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def __init__(self, argc, argv):
 | 
						|
        client.Application.__init__(self, argc, argv)
 | 
						|
 | 
						|
        self.setDatabaseEnabled(False, False)
 | 
						|
        self.setMessagingEnabled(False)
 | 
						|
        self.baseUrl = "https://gws-access.jasmin.ac.uk/public/nceo_geohazards/LiCSAR_products"
 | 
						|
        self.journalFile = "@ROOTDIR@/var/run/" + self.name() + "/journal"
 | 
						|
        self.pollInterval = 60
 | 
						|
        self.networks = None
 | 
						|
        self.dump = False
 | 
						|
        self.outputAddr = "caps://localhost:18003"
 | 
						|
        self.capsLog = False
 | 
						|
        self.output = None
 | 
						|
        self.journal = Journal()
 | 
						|
        self.print = False
 | 
						|
        self.products = {}
 | 
						|
        self.outputDirectory = None
 | 
						|
        self.strStartTime = None
 | 
						|
        self.reprocess = False
 | 
						|
        self.processLatestOnly = False
 | 
						|
        self.filter = Filter()
 | 
						|
 | 
						|
        dt = datetime.utcnow()
 | 
						|
        self.startTime = CAPS.Time()
 | 
						|
        self.startTime.set(dt.year, dt.month, dt.day, 0, 0, 0, 0)
 | 
						|
        self.streamsFile = None
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def initConfiguration(self):
 | 
						|
        if not client.Application.initConfiguration(self):
 | 
						|
            return False
 | 
						|
 | 
						|
        param = ""
 | 
						|
        try:
 | 
						|
            param = "input.baseUrl"
 | 
						|
            try:
 | 
						|
                self.baseUrl = self.configGetString(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.streamsFile"
 | 
						|
            try:
 | 
						|
                self.streamsFile = self.configGetString(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.startTime"
 | 
						|
            try:
 | 
						|
                self.strStartTime = self.configGetString(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.processLatestOnly"
 | 
						|
            try:
 | 
						|
                self.processLatestOnly = self.configGetBool(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.reprocess"
 | 
						|
            try:
 | 
						|
                self.reprocess = self.configGetBool(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.pollInterval"
 | 
						|
            try:
 | 
						|
                self.pollInterval = self.configGetInt(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.products"
 | 
						|
            try:
 | 
						|
                for item in self.configGetStrings(param):
 | 
						|
                    try:
 | 
						|
                        key, value = item.split(":")
 | 
						|
                        if not key or not value:
 | 
						|
                            logging.error(
 | 
						|
                                f"{param}: " "Key and value must not be empty"
 | 
						|
                            )
 | 
						|
                            return False
 | 
						|
 | 
						|
                        self.products[key.strip()] = value.strip()
 | 
						|
                    except ValueError:
 | 
						|
                        logging.error(
 | 
						|
                            f"{param}: Invalid entry: Expected: [KEY:VALUE]"
 | 
						|
                        )
 | 
						|
                        return False
 | 
						|
 | 
						|
            except RuntimeError:
 | 
						|
                self.products = {"geo.unw.tif": "UNW"}
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.filter.enabled"
 | 
						|
            try:
 | 
						|
                self.filter.enabled = self.configGetBool(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.filter.threshold"
 | 
						|
            try:
 | 
						|
                self.filter.threshold = self.configGetDouble(param)
 | 
						|
                param = "input.filter.percentile"
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "input.filter.percentile"
 | 
						|
            try:
 | 
						|
                self.filter.percentile = self.configGetDouble(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "output.addr"
 | 
						|
            try:
 | 
						|
                self.outputAddr = self.configGetString(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "output.directory"
 | 
						|
            try:
 | 
						|
                self.outputDirectory = self.configGetString(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
            param = "journal.file"
 | 
						|
            try:
 | 
						|
                self.journalFile = self.configGetString(param)
 | 
						|
            except RuntimeError:
 | 
						|
                pass
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            logging.error(f"Invalid parameter {param}: {e}")
 | 
						|
            return False
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def createCommandLineDescription(self):
 | 
						|
        client.Application.createCommandLineDescription(self)
 | 
						|
 | 
						|
        self.commandline().addGroup("Input")
 | 
						|
        self.commandline().addStringOption(
 | 
						|
            "Input",
 | 
						|
            "base-url",
 | 
						|
            f"Base URL from which data is received (default={self.baseUrl})",
 | 
						|
        )
 | 
						|
        self.commandline().addStringOption(
 | 
						|
            "Input",
 | 
						|
            "interval,i",
 | 
						|
            "Poll mode interval in seconds (default={self.pollInterval})",
 | 
						|
        )
 | 
						|
        self.commandline().addOption(
 | 
						|
            "Input",
 | 
						|
            "reprocess",
 | 
						|
            "Force reprocessing of the last received grid at start"
 | 
						|
            f"(default={self.reprocess})",
 | 
						|
        )
 | 
						|
 | 
						|
        self.commandline().addGroup("Output")
 | 
						|
        self.commandline().addStringOption(
 | 
						|
            "Output",
 | 
						|
            "addr,a",
 | 
						|
            "Data output address [[caps|capss]://][user:pass@]host[:port]",
 | 
						|
        )
 | 
						|
        self.commandline().addStringOption(
 | 
						|
            "Output",
 | 
						|
            "dir,d",
 | 
						|
            "Output directory. Write grid files to this directory instead "
 | 
						|
            " of sending it to CAPS.",
 | 
						|
        )
 | 
						|
        self.commandline().addBoolOption(
 | 
						|
            "Output",
 | 
						|
            "caps-log",
 | 
						|
            f"Enable CAPS logging (default={self.capsLog})",
 | 
						|
        )
 | 
						|
        self.commandline().addOption(
 | 
						|
            "Output", "print-packets", "Print packets"
 | 
						|
        )
 | 
						|
 | 
						|
        self.commandline().addGroup("Journal")
 | 
						|
        self.commandline().addStringOption(
 | 
						|
            "Journal",
 | 
						|
            "journal,j",
 | 
						|
            "File to store stream states. Use an "
 | 
						|
            "empty string to log to standard out"
 | 
						|
            "[[caps|capss]://][user:pass@]host[:port]",
 | 
						|
        )
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def printUsage(self):
 | 
						|
        print(
 | 
						|
            """
 | 
						|
licsar2caps: Import licsar data from web page to CAPS.
 | 
						|
 | 
						|
"""
 | 
						|
        )
 | 
						|
        client.StreamApplication.printUsage(self)
 | 
						|
 | 
						|
        print("Examples")
 | 
						|
        print("Processing with informative debug output.")
 | 
						|
        print(f"  {self.name()} --debug")
 | 
						|
        print("Write output grids to directory")
 | 
						|
        print(f"  {self.name()} -d /tmp")
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def getProduct(self, fullUrl, streamID, filename):
 | 
						|
        logging.info(f"  + {streamID}: Downloading data product {filename}")
 | 
						|
        try:
 | 
						|
            res = requests.get(fullUrl, timeout=10, allow_redirects=True)
 | 
						|
        except Exception:
 | 
						|
            logging.info(
 | 
						|
                f"+ {streamID}: Download failed. Read operation timed out. "
 | 
						|
                f"URL: {fullUrl}"
 | 
						|
            )
 | 
						|
            return None
 | 
						|
 | 
						|
        if res.status_code != 200:
 | 
						|
            logging.info(
 | 
						|
                f"  + {streamID}: Download failed. HTTP status code "
 | 
						|
                f"{res.status_code}: {res.reason}. URL: {fullUrl}"
 | 
						|
            )
 | 
						|
            return None
 | 
						|
 | 
						|
        logging.info(f"  + {streamID}: Downloaded data product {filename}")
 | 
						|
        with tempfile.NamedTemporaryFile() as tmp:
 | 
						|
            tmp.write(res.content)
 | 
						|
            tmp.flush()
 | 
						|
            try:
 | 
						|
                filename = "/vsimem/in_memory_output.grd"
 | 
						|
                ds = gdal.Translate(
 | 
						|
                    filename,
 | 
						|
                    tmp.name,
 | 
						|
                    format="GSBG",
 | 
						|
                    outputType=gdal.GDT_Float32,
 | 
						|
                )
 | 
						|
 | 
						|
                if not ds:
 | 
						|
                    logging.info(
 | 
						|
                        f"  + {streamID}: Could not convert data product "
 | 
						|
                        f"{filename} to Surfer6 format"
 | 
						|
                    )
 | 
						|
                    return None
 | 
						|
 | 
						|
                if self.filter.enabled:
 | 
						|
                    value = utils.calculateAbsPerc(
 | 
						|
                        ds.GetRasterBand(1).ReadAsArray(),
 | 
						|
                        self.filter.percentile,
 | 
						|
                    )
 | 
						|
 | 
						|
                    if value <= self.filter.threshold:
 | 
						|
                        logging.info(
 | 
						|
                            f"  + {streamID}: Computed grid displacement is "
 | 
						|
                            "less or equal the configured threshold "
 | 
						|
                            f"{value} <= {self.filter.threshold}. Skipping "
 | 
						|
                            "grid"
 | 
						|
                        )
 | 
						|
                        return None
 | 
						|
 | 
						|
                f = gdal.VSIFOpenL(filename, "rb")
 | 
						|
                gdal.VSIFSeekL(f, 0, 2)
 | 
						|
                filesize = gdal.VSIFTellL(f)
 | 
						|
                gdal.VSIFSeekL(f, 0, 0)
 | 
						|
 | 
						|
                content = gdal.VSIFReadL(1, filesize, f)
 | 
						|
                gdal.VSIFCloseL(f)
 | 
						|
                gdal.Unlink("/vsimem/in_memory_output.grd")
 | 
						|
 | 
						|
                data = content
 | 
						|
                return data
 | 
						|
            except AttributeError:
 | 
						|
                # Fallback for RHEL 7
 | 
						|
                import subprocess
 | 
						|
 | 
						|
                with tempfile.NamedTemporaryFile() as outFile:
 | 
						|
                    filename = outFile.name
 | 
						|
                    subprocess.call(
 | 
						|
                        ["gdal_translate", "-of", "GSBG", tmp.name, filename]
 | 
						|
                    )
 | 
						|
 | 
						|
                    with open(filename, "rb") as f:
 | 
						|
                        return f.read()
 | 
						|
 | 
						|
                return None
 | 
						|
 | 
						|
        return None
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def storeProduct(self, data, streamID, item):
 | 
						|
        if not os.path.exists(self.outputDirectory):
 | 
						|
            try:
 | 
						|
                os.makedirs(self.outputDirectory)
 | 
						|
            except OSError as err:
 | 
						|
                logging.error(
 | 
						|
                    "Could not create output directory "
 | 
						|
                    f"'{self.outputDirectory}': {err}"
 | 
						|
                )
 | 
						|
                return False
 | 
						|
 | 
						|
        filename = os.path.join(self.outputDirectory, streamID + ".grd")
 | 
						|
        try:
 | 
						|
            with open(filename, "wb") as f:
 | 
						|
                f.write(data)
 | 
						|
        except IOError as err:
 | 
						|
            logging.error(f"Failed to write data to file {filename}: {err}")
 | 
						|
            return False
 | 
						|
 | 
						|
        logging.info(f"  + {streamID}: Stored data in '{filename}'")
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def sendProduct(self, data, item, channelCode, startTime, endTime):
 | 
						|
        streamID = item.stationID + "." + channelCode
 | 
						|
        if self.print:
 | 
						|
            print(
 | 
						|
                f"{streamID} - {startTime.iso()} ~ "
 | 
						|
                f"{endTime.iso()} Size: {len(data)} bytes"
 | 
						|
            )
 | 
						|
            return True
 | 
						|
 | 
						|
        ret = self.output.pushAny(
 | 
						|
            item.networkCode,
 | 
						|
            item.stationCode,
 | 
						|
            item.locationCode,
 | 
						|
            channelCode,
 | 
						|
            startTime,
 | 
						|
            endTime,
 | 
						|
            1,
 | 
						|
            0,
 | 
						|
            "grd",
 | 
						|
            "N/A",
 | 
						|
            bytes(data),
 | 
						|
        )
 | 
						|
        if ret != CAPS.Plugin.Success:
 | 
						|
            logging.error(
 | 
						|
                f"{streamID} - Data {startTime.iso()} ~ {endTime.iso()} "
 | 
						|
                "could not be sent to CAPS: Error code: {ret}"
 | 
						|
            )
 | 
						|
            return False
 | 
						|
 | 
						|
        logging.info(
 | 
						|
            f"  + {streamID}: Sent packet {startTime.iso()} ~ "
 | 
						|
            f"{endTime.iso()} {len(data)} bytes"
 | 
						|
        )
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def done(self):
 | 
						|
        if self.output:
 | 
						|
            self.output.close()
 | 
						|
 | 
						|
        self.journal.write(self.journalFile)
 | 
						|
 | 
						|
        client.Application.done(self)
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def init(self):
 | 
						|
        if not client.Application.init(self):
 | 
						|
            return False
 | 
						|
 | 
						|
        if os.path.isfile(self.journalFile) and not self.journal.read(
 | 
						|
            self.journalFile
 | 
						|
        ):
 | 
						|
            return False
 | 
						|
 | 
						|
        self.streamMap = StreamMap()
 | 
						|
        if not self.streamMap.read(self.streamsFile):
 | 
						|
            return False
 | 
						|
 | 
						|
        for key, item in self.streamMap.items.items():
 | 
						|
            j = self.journal.get(key)
 | 
						|
            if j:
 | 
						|
                item.startTime = j.startTime
 | 
						|
                item.endTime = j.endTime
 | 
						|
            else:
 | 
						|
                if self.startTime:
 | 
						|
                    item.startTime = self.startTime
 | 
						|
                    item.endTime = self.startTime
 | 
						|
 | 
						|
        url = urllib.parse.urlparse(self.outputAddr)
 | 
						|
        if not url:
 | 
						|
            logging.error("Could not parse data output address")
 | 
						|
            return False
 | 
						|
 | 
						|
        self.output = CAPS.Plugin("licsar2caps")
 | 
						|
        self.output.setHost(url.hostname)
 | 
						|
        self.output.setPort(url.port)
 | 
						|
        self.output.setBufferSize(1 << 29)
 | 
						|
        if self.capsLog:
 | 
						|
            self.output.enablelogging()
 | 
						|
 | 
						|
        protocol = url.scheme.lower()
 | 
						|
        if protocol == "capss":
 | 
						|
            self.output.setSSLEnabled(True)
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def run(self):
 | 
						|
        if self.outputDirectory:
 | 
						|
            outputStr = f"\n    Output Directory: {self.outputDirectory}"
 | 
						|
        else:
 | 
						|
            outputStr = f"\n    Output address  : {self.outputAddr}"
 | 
						|
 | 
						|
        logging.info(
 | 
						|
            "\nConfiguration:"
 | 
						|
            f"\n    Base URL        : {self.baseUrl}"
 | 
						|
            f"\n    Poll interval   : {self.pollInterval} s"
 | 
						|
            f"\n    Streams         : {self.streamsFile}"
 | 
						|
            f"\n    Journal         : {self.journalFile}"
 | 
						|
            f"{outputStr}"
 | 
						|
            f"\n    Start time      : {self.startTime.iso()}"
 | 
						|
            f"\n    Products        : {self.products}"
 | 
						|
            f"\n    Filter          : {'Enabled' if self.filter.enabled else 'Disabled'}"
 | 
						|
            f"\n      Threshold     : {self.filter.threshold}"
 | 
						|
            f"\n      Percentile    : {self.filter.percentile}"
 | 
						|
            f"\n    Force update    : {self.reprocess}"
 | 
						|
            f"\n    Process latest  : {self.processLatestOnly}"
 | 
						|
        )
 | 
						|
 | 
						|
        self.runWatch()
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    def getEpochs(self, url, item):
 | 
						|
        url = url + "/interferograms/"
 | 
						|
        logging.info(f"  + Checking station {item.stationID}")
 | 
						|
        res = requests.get(url, timeout=10, allow_redirects=True)
 | 
						|
 | 
						|
        if res.status_code != 200:
 | 
						|
            logging.error(f"HTTP status code {res.status_code}: {res.reason}")
 | 
						|
            logging.info("  + End")
 | 
						|
            return None
 | 
						|
 | 
						|
        soup = BeautifulSoup(res.text, "html.parser")
 | 
						|
        data = [
 | 
						|
            node.get("href").replace("/", "")
 | 
						|
            for node in soup.find_all("a")
 | 
						|
            if node.get("href").endswith("/")
 | 
						|
            and node.text != "Parent Directory"
 | 
						|
        ]
 | 
						|
 | 
						|
        logging.info(f"    + Found {len(data)} epochs")
 | 
						|
        epochs = []
 | 
						|
        for epoch in data:
 | 
						|
            try:
 | 
						|
                start, end = epoch.split("_")
 | 
						|
            except Exception:
 | 
						|
                logging.error(
 | 
						|
                    f"{item.stationID}: Invalid epoch {epoch}: "
 | 
						|
                    "Expected [START_END]"
 | 
						|
                )
 | 
						|
                continue
 | 
						|
 | 
						|
            startTime = CAPS.Time.FromString(start, "%Y%m%d")
 | 
						|
            if not startTime.valid():
 | 
						|
                logging.error(f"{item.stationID}: Invalid start time {start}")
 | 
						|
                continue
 | 
						|
 | 
						|
            endTime = CAPS.Time.FromString(end, "%Y%m%d")
 | 
						|
            if not endTime.valid():
 | 
						|
                logging.error(f"{item.stationID}: Invalid end time {end}")
 | 
						|
                continue
 | 
						|
 | 
						|
            if needsUpdate(item, startTime, endTime):
 | 
						|
                epochs.append([epoch, startTime, endTime])
 | 
						|
            elif self.reprocess:
 | 
						|
                if item.startTime == startTime and item.endTime == endTime:
 | 
						|
                    epochs.append([epoch, startTime, endTime])
 | 
						|
 | 
						|
        if not epochs:
 | 
						|
            logging.info("    + No new data available. Nothing todo")
 | 
						|
        else:
 | 
						|
            logging.info(f"    + {len(epochs)} epoch(s) must be processed")
 | 
						|
 | 
						|
        logging.info("  + End")
 | 
						|
 | 
						|
        return epochs
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def runWatch(self):
 | 
						|
        while not self.isExitRequested():
 | 
						|
            logging.info("Looking for new data")
 | 
						|
            for stationID, item in self.streamMap.items.items():
 | 
						|
                if self.isExitRequested():
 | 
						|
                    break
 | 
						|
 | 
						|
                url = self.baseUrl + "/" + item.baseCode + "/" + item.folder
 | 
						|
                epochs = self.getEpochs(url, item)
 | 
						|
                if not epochs:
 | 
						|
                    continue
 | 
						|
 | 
						|
                if self.processLatestOnly:
 | 
						|
                    epochs = [epochs[-1]]
 | 
						|
 | 
						|
                for epoch, startTime, endTime in epochs:
 | 
						|
                    if self.isExitRequested():
 | 
						|
                        break
 | 
						|
 | 
						|
                    for k, cha in self.products.items():
 | 
						|
                        streamID = (
 | 
						|
                            item.networkCode
 | 
						|
                            + "."
 | 
						|
                            + item.stationCode
 | 
						|
                            + "."
 | 
						|
                            + item.locationCode
 | 
						|
                            + "."
 | 
						|
                            + cha
 | 
						|
                        )
 | 
						|
 | 
						|
                        filename = epoch + "." + k
 | 
						|
                        fullUrl = (
 | 
						|
                            url + "/interferograms/" + epoch + "/" + filename
 | 
						|
                        )
 | 
						|
 | 
						|
                        data = self.getProduct(fullUrl, streamID, filename)
 | 
						|
                        if data:
 | 
						|
                            if self.outputDirectory:
 | 
						|
                                self.storeProduct(data, streamID, item)
 | 
						|
                            else:
 | 
						|
                                self.sendProduct(
 | 
						|
                                    data, item, cha, startTime, endTime
 | 
						|
                                )
 | 
						|
 | 
						|
                        if self.isExitRequested():
 | 
						|
                            break
 | 
						|
 | 
						|
                    # Update start and end time of the station
 | 
						|
                    item.startTime = startTime
 | 
						|
                    item.endTime = endTime
 | 
						|
 | 
						|
                    # Update Journal
 | 
						|
                    self.journal.items[item.stationID] = JournalItem(
 | 
						|
                        startTime, endTime
 | 
						|
                    )
 | 
						|
 | 
						|
            if self.isExitRequested():
 | 
						|
                break
 | 
						|
 | 
						|
            # Do reprocessing only once
 | 
						|
            if self.reprocess:
 | 
						|
                self.reprocess = False
 | 
						|
 | 
						|
            logging.info("End")
 | 
						|
            logging.info(f"Next run in {self.pollInterval} seconds")
 | 
						|
            self.wait(self.pollInterval)
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def wait(self, seconds):
 | 
						|
        for _i in range(0, seconds):
 | 
						|
            time.sleep(1)
 | 
						|
            if self.isExitRequested():
 | 
						|
                break
 | 
						|
 | 
						|
    # -------------------------------------------------------------------------
 | 
						|
    def validateParameters(self):
 | 
						|
        if not client.Application.validateParameters(self):
 | 
						|
            return False
 | 
						|
 | 
						|
        self.dump = self.commandline().hasOption("dump")
 | 
						|
        if self.dump:
 | 
						|
            self.setMessagingEnabled(False)
 | 
						|
 | 
						|
        try:
 | 
						|
            self.baseUrl = self.commandline().optionString("base-url")
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        try:
 | 
						|
            self.strStartTime = self.commandline().optionString("start-time")
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        if self.strStartTime:
 | 
						|
            self.startTime = utils.parseTime(self.strStartTime)
 | 
						|
            if not self.startTime.valid():
 | 
						|
                logging.error(f"Invalid start time '{self.strStartTime}")
 | 
						|
                return False
 | 
						|
 | 
						|
        try:
 | 
						|
            self.journalFile = self.commandline().optionString("journal")
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        try:
 | 
						|
            self.pollInterval = int(
 | 
						|
                self.commandline().optionString("interval")
 | 
						|
            )
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        try:
 | 
						|
            self.outputAddr = self.commandline().optionString("addr")
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        try:
 | 
						|
            self.outputDirectory = self.commandline().optionString("dir")
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        try:
 | 
						|
            self.journalFile = self.commandline().optionString("journal")
 | 
						|
        except Exception:
 | 
						|
            pass
 | 
						|
 | 
						|
        if self.journalFile:
 | 
						|
            self.journalFile = system.Environment.Instance().absolutePath(
 | 
						|
                self.journalFile
 | 
						|
            )
 | 
						|
 | 
						|
        if not self.streamsFile:
 | 
						|
            logging.error("Option 'input.streamsFile' is mandatory")
 | 
						|
            return False
 | 
						|
 | 
						|
        self.streamsFile = system.Environment.Instance().absolutePath(
 | 
						|
            self.streamsFile
 | 
						|
        )
 | 
						|
 | 
						|
        if self.outputDirectory:
 | 
						|
            self.outputDirectory = system.Environment.Instance().absolutePath(
 | 
						|
                self.outputDirectory
 | 
						|
            )
 | 
						|
 | 
						|
        if self.commandline().hasOption("reprocess"):
 | 
						|
            self.reprocess = True
 | 
						|
 | 
						|
        self.print = self.commandline().hasOption("print-packets")
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
 | 
						|
# -----------------------------------------------------------------------------
 | 
						|
app = App(len(sys.argv), sys.argv)
 | 
						|
sys.exit(app())
 |