1443 lines
		
	
	
		
			51 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1443 lines
		
	
	
		
			51 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
###############################################################################
 | 
						|
# Copyright (C) 2013-2014 by gempa GmbH
 | 
						|
#
 | 
						|
# FDSNStation -- Implements the fdsnws-availability Web service, see
 | 
						|
#   http://www.fdsn.org/webservices/
 | 
						|
#
 | 
						|
# Feature notes:
 | 
						|
#   - additional request parameters:
 | 
						|
#     - excludetoolarge:       boolean, default: true
 | 
						|
#
 | 
						|
# Author:  Stephan Herrnkind
 | 
						|
# Email:   herrnkind@gempa.de
 | 
						|
###############################################################################
 | 
						|
 | 
						|
from collections import OrderedDict
 | 
						|
 | 
						|
from twisted.cred import portal
 | 
						|
from twisted.internet.threads import deferToThread
 | 
						|
from twisted.web import http, resource, server
 | 
						|
 | 
						|
from zope.interface import implementer
 | 
						|
 | 
						|
from seiscomp import datamodel, io, logging
 | 
						|
from seiscomp.client import Application
 | 
						|
from seiscomp.core import Time
 | 
						|
 | 
						|
from .http import BaseResource
 | 
						|
from .request import RequestOptions
 | 
						|
 | 
						|
from . import utils
 | 
						|
 | 
						|
 | 
						|
DBMaxUInt = 18446744073709551615  # 2^64 - 1
 | 
						|
VERSION = "1.0.4"
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class _AvailabilityRequestOptions(RequestOptions):
 | 
						|
    # merge options
 | 
						|
    VMergeSampleRate = "samplerate"
 | 
						|
    VMergeQuality = "quality"
 | 
						|
    VMergeOverlap = "overlap"
 | 
						|
    VMerge = [VMergeSampleRate, VMergeQuality]  # overlap option only available
 | 
						|
    # in query method
 | 
						|
 | 
						|
    # orderby options
 | 
						|
    VOrderByNSLC = "nslc_time_quality_samplerate"
 | 
						|
    VOrderByCount = "timespancount"
 | 
						|
    VOrderByCountDesc = "timespancount_desc"
 | 
						|
    VOrderByUpdate = "latestupdate"
 | 
						|
    VOrderByUpdateDesc = "latestupdate_desc"
 | 
						|
    VOrderBy = [
 | 
						|
        VOrderByNSLC,
 | 
						|
        VOrderByCount,
 | 
						|
        VOrderByCountDesc,
 | 
						|
        VOrderByUpdate,
 | 
						|
        VOrderByUpdateDesc,
 | 
						|
    ]
 | 
						|
 | 
						|
    # format options
 | 
						|
    VFormatText = "text"
 | 
						|
    VFormatGeoCSV = "geocsv"
 | 
						|
    VFormatJSON = "json"
 | 
						|
    VFormatRequest = "request"
 | 
						|
    OutputFormats = [VFormatText, VFormatGeoCSV, VFormatJSON, VFormatRequest]
 | 
						|
 | 
						|
    # request parameters
 | 
						|
    PQuality = ["quality"]
 | 
						|
    PMerge = ["merge"]
 | 
						|
    POrderBy = ["orderby"]
 | 
						|
    PLimit = ["limit"]
 | 
						|
    PIncludeRestricted = ["includerestricted"]
 | 
						|
 | 
						|
    POSTParams = (
 | 
						|
        RequestOptions.POSTParams
 | 
						|
        + PQuality
 | 
						|
        + PMerge
 | 
						|
        + POrderBy
 | 
						|
        + PLimit
 | 
						|
        + PIncludeRestricted
 | 
						|
    )
 | 
						|
    GETParams = RequestOptions.GETParams + POSTParams
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self):
 | 
						|
        super().__init__()
 | 
						|
 | 
						|
        self.service = "availability-base"
 | 
						|
        self.quality = None
 | 
						|
        self.mergeSampleRate = None
 | 
						|
        self.mergeQuality = None
 | 
						|
        self.mergeOverlap = None
 | 
						|
        self.orderBy = self.VOrderBy[0]
 | 
						|
        self.limit = None
 | 
						|
        self.includeRestricted = None
 | 
						|
        self.showLatestUpdate = None
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def parse(self):
 | 
						|
        self.parseTime()
 | 
						|
        self.parseChannel()
 | 
						|
        self.parseOutput()
 | 
						|
 | 
						|
        # quality: D, M, Q, R, * (optional)
 | 
						|
        for v in self.getListValues(self.PQuality):
 | 
						|
            if len(v) == 1:
 | 
						|
                if v[0] == "*":
 | 
						|
                    self.quality = None
 | 
						|
                    break
 | 
						|
                if v[0].isupper():
 | 
						|
                    if self.quality is None:
 | 
						|
                        self.quality = [v]
 | 
						|
                    else:
 | 
						|
                        self.quality.append(v)
 | 
						|
                    continue
 | 
						|
            self.raiseValueError(self.PQuality[0])
 | 
						|
 | 
						|
        # merge (optional)
 | 
						|
        for v in self.getListValues(self.PMerge, True):
 | 
						|
            if v not in self.VMerge:
 | 
						|
                self.raiseValueError(self.PMerge[0])
 | 
						|
 | 
						|
            if v == self.VMergeSampleRate:
 | 
						|
                self.mergeSampleRate = True
 | 
						|
            elif v == self.VMergeQuality:
 | 
						|
                self.mergeQuality = True
 | 
						|
            elif v == self.VMergeOverlap:
 | 
						|
                self.mergeOverlap = True
 | 
						|
 | 
						|
        # orderby (optional)
 | 
						|
        key, value = self.getFirstValue(self.POrderBy)
 | 
						|
        if value is not None:
 | 
						|
            if value in self.VOrderBy:
 | 
						|
                self.orderBy = value
 | 
						|
            else:
 | 
						|
                self.raiseValueError(key)
 | 
						|
 | 
						|
        # limit (optional)
 | 
						|
        self.limit = self.parseInt(self.PLimit, 1, DBMaxUInt)
 | 
						|
 | 
						|
        # includeRestricted (optional)
 | 
						|
        self.includeRestricted = self.parseBool(self.PIncludeRestricted)
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def extentIter(self, dac, user=None, access=None):
 | 
						|
        # tupel: extent, oid, restricted
 | 
						|
        for e in dac.extentsSorted():
 | 
						|
            ext = e[0]
 | 
						|
            restricted = e[2]
 | 
						|
            wid = ext.waveformID()
 | 
						|
            net = wid.networkCode()
 | 
						|
            sta = wid.stationCode()
 | 
						|
            loc = wid.locationCode()
 | 
						|
            cha = wid.channelCode()
 | 
						|
 | 
						|
            if restricted:
 | 
						|
                if not user:
 | 
						|
                    continue
 | 
						|
 | 
						|
                if access:
 | 
						|
                    startTime = ext.start()
 | 
						|
                    endTime = ext.end()
 | 
						|
                    if self.time:
 | 
						|
                        if self.time.start() and self.time.start() > startTime:
 | 
						|
                            startTime = self.time.start()
 | 
						|
                        if self.time.end() and self.time.end() < ext.end():
 | 
						|
                            endTime = self.time.end()
 | 
						|
 | 
						|
                    if not access.authorize(
 | 
						|
                        user, net, sta, loc, cha, startTime, endTime
 | 
						|
                    ):
 | 
						|
                        continue
 | 
						|
 | 
						|
            for ro in self.streams:
 | 
						|
                if ro.channel:
 | 
						|
                    if (
 | 
						|
                        not ro.channel.matchNet(net)
 | 
						|
                        or not ro.channel.matchSta(sta)
 | 
						|
                        or not ro.channel.matchLoc(loc)
 | 
						|
                        or not ro.channel.matchCha(cha)
 | 
						|
                    ):
 | 
						|
                        continue
 | 
						|
 | 
						|
                if ro.time and not ro.time.match(ext.start(), ext.end()):
 | 
						|
                    continue
 | 
						|
 | 
						|
                if not self.includeRestricted and restricted:
 | 
						|
                    continue
 | 
						|
 | 
						|
                yield e
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class _AvailabilityExtentRequestOptions(_AvailabilityRequestOptions):
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self):
 | 
						|
        super().__init__()
 | 
						|
        self.service = "availability-extent"
 | 
						|
 | 
						|
        self.showLatestUpdate = True
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def attributeExtentIter(self, ext):
 | 
						|
        for i in range(ext.dataAttributeExtentCount()):
 | 
						|
            attExt = ext.dataAttributeExtent(i)
 | 
						|
 | 
						|
            if self.time and not self.time.match(attExt.start(), attExt.end()):
 | 
						|
                continue
 | 
						|
 | 
						|
            if self.quality and attExt.quality() not in self.quality:
 | 
						|
                continue
 | 
						|
 | 
						|
            yield attExt
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class _AvailabilityQueryRequestOptions(_AvailabilityRequestOptions):
 | 
						|
    # additional merge options
 | 
						|
    VMerge = _AvailabilityRequestOptions.VMerge + [
 | 
						|
        _AvailabilityRequestOptions.VMergeOverlap
 | 
						|
    ]
 | 
						|
 | 
						|
    # show options
 | 
						|
    VShowLatestUpdate = "latestupdate"
 | 
						|
    VShow = [VShowLatestUpdate]
 | 
						|
 | 
						|
    # additional query specific request parameters
 | 
						|
    PMergeGaps = ["mergegaps"]
 | 
						|
    PShow = ["show"]
 | 
						|
    PExcludeTooLarge = ["excludetoolarge"]
 | 
						|
 | 
						|
    POSTParams = (
 | 
						|
        _AvailabilityRequestOptions.POSTParams + PMergeGaps + PShow + PExcludeTooLarge
 | 
						|
    )
 | 
						|
    GETParams = (
 | 
						|
        _AvailabilityRequestOptions.GETParams + PMergeGaps + PShow + PExcludeTooLarge
 | 
						|
    )
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self):
 | 
						|
        super().__init__()
 | 
						|
        self.service = "availability-query"
 | 
						|
 | 
						|
        self.orderBy = None
 | 
						|
        self.mergeGaps = None
 | 
						|
        self.excludeTooLarge = None
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def parse(self):
 | 
						|
        _AvailabilityRequestOptions.parse(self)
 | 
						|
 | 
						|
        # merge gaps threshold (optional)
 | 
						|
        self.mergeGaps = self.parseFloat(self.PMergeGaps, 0)
 | 
						|
 | 
						|
        # show (optional)
 | 
						|
        for v in self.getListValues(self.PShow, True):
 | 
						|
            if v not in self.VShow:
 | 
						|
                self.raiseValueError(v)
 | 
						|
            if v == self.VShowLatestUpdate:
 | 
						|
                self.showLatestUpdate = True
 | 
						|
 | 
						|
        # exclude to large (optional)
 | 
						|
        self.excludeTooLarge = self.parseBool(self.PExcludeTooLarge)
 | 
						|
        if self.excludeTooLarge is None:
 | 
						|
            self.excludeTooLarge = True
 | 
						|
 | 
						|
        if not self.channel:
 | 
						|
            raise ValueError("Request contains no selections")
 | 
						|
 | 
						|
        if self.orderBy:
 | 
						|
            raise ValueError("orderby not supported for query request")
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class _Availability(BaseResource):
 | 
						|
    isLeaf = True
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _createRequestOptions():
 | 
						|
        pass
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self, access=None, user=None):
 | 
						|
        super().__init__(VERSION)
 | 
						|
 | 
						|
        self.access = access
 | 
						|
        self.user = user
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def render_OPTIONS(self, req):
 | 
						|
        req.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
 | 
						|
        req.setHeader(
 | 
						|
            "Access-Control-Allow-Headers",
 | 
						|
            "Accept, Content-Type, X-Requested-With, Origin",
 | 
						|
        )
 | 
						|
        req.setHeader("Content-Type", "text/plain; charset=utf-8")
 | 
						|
        return ""
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def render_GET(self, req):
 | 
						|
        # Parse and validate GET parameters
 | 
						|
        ro = self._createRequestOptions()
 | 
						|
        try:
 | 
						|
            ro.parseGET(req.args)
 | 
						|
            ro.parse()
 | 
						|
 | 
						|
            # the GET operation supports exactly one stream filter
 | 
						|
            ro.streams.append(ro)
 | 
						|
        except ValueError as e:
 | 
						|
            logging.warning(str(e))
 | 
						|
            return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
 | 
						|
 | 
						|
        return self._prepareRequest(req, ro)
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def render_POST(self, req):
 | 
						|
        # Parse and validate POST parameters
 | 
						|
        ro = self._createRequestOptions()
 | 
						|
        try:
 | 
						|
            ro.parsePOST(req.content)
 | 
						|
            ro.parse()
 | 
						|
        except ValueError as e:
 | 
						|
            logging.warning(str(e))
 | 
						|
            return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
 | 
						|
 | 
						|
        return self._prepareRequest(req, ro)
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _processRequest(self, req, ro, dac):
 | 
						|
        pass
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeJSONChannels(self, req, header, footer, lines, ro):
 | 
						|
        pass
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _prepareRequest(self, req, ro):
 | 
						|
        dac = Application.Instance().getDACache()
 | 
						|
 | 
						|
        if ro.format == ro.VFormatJSON:
 | 
						|
            contentType = "application/json; charset=utf-8"
 | 
						|
            extension = "json"
 | 
						|
        elif ro.format == ro.VFormatGeoCSV:
 | 
						|
            contentType = "text/csv; charset=utf-8"
 | 
						|
            extension = "csv"
 | 
						|
        else:
 | 
						|
            contentType = "text/plain; charset=utf-8"
 | 
						|
            extension = "txt"
 | 
						|
 | 
						|
        req.setHeader("Content-Type", contentType)
 | 
						|
        req.setHeader(
 | 
						|
            "Content-Disposition",
 | 
						|
            f"inline; filename=fdsnws-ext-availability_{Time.GMT().iso()}.{extension}",
 | 
						|
        )
 | 
						|
 | 
						|
        d = deferToThread(self._processRequest, req, ro, dac)
 | 
						|
 | 
						|
        req.notifyFinish().addErrback(utils.onCancel, d)
 | 
						|
        d.addBoth(utils.onFinish, req)
 | 
						|
 | 
						|
        # The request is handled by the deferred object
 | 
						|
        return server.NOT_DONE_YET
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _formatTime(time, ms=False):
 | 
						|
        if ms:
 | 
						|
            return f"{time.toString('%FT%T')}.{time.microseconds():06d}Z"
 | 
						|
        return time.toString("%FT%TZ")
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeLines(self, req, lines, ro):
 | 
						|
        if ro.format == ro.VFormatText:
 | 
						|
            return self._writeFormatText(req, lines, ro)
 | 
						|
        if ro.format == ro.VFormatGeoCSV:
 | 
						|
            return self._writeFormatGeoCSV(req, lines, ro)
 | 
						|
        if ro.format == ro.VFormatJSON:
 | 
						|
            return self._writeFormatJSON(req, lines, ro)
 | 
						|
        if ro.format == ro.VFormatRequest:
 | 
						|
            return self._writeFormatRequest(req, lines, ro)
 | 
						|
 | 
						|
        raise ValueError(f"unknown reponse format: {ro.format}")
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeFormatText(self, req, lines, ro):
 | 
						|
        charCount = 0
 | 
						|
        lineCount = 0
 | 
						|
 | 
						|
        # the extent service uses a different update column name and alignment
 | 
						|
        isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
 | 
						|
 | 
						|
        nslc = "{0: <2}  {1: <5}  {2: <2}  {3: <3}"
 | 
						|
        quality = "  {0: <1}"
 | 
						|
        sampleRate = "  {0: >11}"
 | 
						|
        sampleRateF = "  {0: >11.1f}"
 | 
						|
        time = "  {0: <27}  {1: <27}"
 | 
						|
        updated = "  {0: <20}"
 | 
						|
        timeSpans = "  {0: >10}"
 | 
						|
        restriction = "  {0: <11}"
 | 
						|
 | 
						|
        header = nslc.format("#N", "S", "L", "C")
 | 
						|
        if not ro.mergeQuality:
 | 
						|
            header += quality.format("Q")
 | 
						|
        if not ro.mergeSampleRate:
 | 
						|
            header += sampleRate.format("SampleRate")
 | 
						|
        header += time.format("Earliest", "Latest")
 | 
						|
        if ro.showLatestUpdate:
 | 
						|
            header += updated.format("Updated")
 | 
						|
        if isExtentReq:
 | 
						|
            header += timeSpans.format("TimeSpans")
 | 
						|
            header += restriction.format("Restriction")
 | 
						|
        header += "\n"
 | 
						|
 | 
						|
        first = True
 | 
						|
        for line in lines:
 | 
						|
            if first:
 | 
						|
                first = False
 | 
						|
                utils.writeTS(req, header)
 | 
						|
                charCount += len(header)
 | 
						|
 | 
						|
            wid = line[0].waveformID()
 | 
						|
            e = line[1]
 | 
						|
            loc = wid.locationCode() if wid.locationCode() else "--"
 | 
						|
            data = nslc.format(
 | 
						|
                wid.networkCode(), wid.stationCode(), loc, wid.channelCode()
 | 
						|
            )
 | 
						|
            if not ro.mergeQuality:
 | 
						|
                data += quality.format(e.quality())
 | 
						|
            if not ro.mergeSampleRate:
 | 
						|
                data += sampleRateF.format(e.sampleRate())
 | 
						|
            data += time.format(
 | 
						|
                self._formatTime(e.start(), True), self._formatTime(e.end(), True)
 | 
						|
            )
 | 
						|
            if ro.showLatestUpdate:
 | 
						|
                data += updated.format(self._formatTime(e.updated()))
 | 
						|
            if isExtentReq:
 | 
						|
                data += timeSpans.format(e.segmentCount())
 | 
						|
                data += restriction.format("RESTRICTED" if line[2] else "OPEN")
 | 
						|
            data += "\n"
 | 
						|
 | 
						|
            utils.writeTS(req, data)
 | 
						|
            charCount += len(data)
 | 
						|
            lineCount += 1
 | 
						|
 | 
						|
        return charCount, lineCount
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeFormatRequest(self, req, lines, ro):
 | 
						|
        charCount = 0
 | 
						|
        lineCount = 0
 | 
						|
 | 
						|
        for line in lines:
 | 
						|
            wid = line[0].waveformID()
 | 
						|
            e = line[1]
 | 
						|
            loc = wid.locationCode() if wid.locationCode() else "--"
 | 
						|
            start = e.start()
 | 
						|
            end = e.end()
 | 
						|
 | 
						|
            # truncate start and end time to requested time frame
 | 
						|
            if ro.time:
 | 
						|
                if ro.time.start and ro.time.start > start:
 | 
						|
                    start = ro.time.start
 | 
						|
                if ro.time.end and ro.time.end < end:
 | 
						|
                    end = ro.time.end
 | 
						|
 | 
						|
            data = (
 | 
						|
                f"{wid.networkCode()} {wid.stationCode()} {loc} {wid.channelCode()} "
 | 
						|
                f"{self._formatTime(start, True)} {self._formatTime(end, True)}\n"
 | 
						|
            )
 | 
						|
 | 
						|
            utils.writeTS(req, data)
 | 
						|
            charCount += len(data)
 | 
						|
            lineCount += 1
 | 
						|
 | 
						|
        return charCount, lineCount
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeFormatGeoCSV(self, req, lines, ro):
 | 
						|
        charCount = 0
 | 
						|
        lineCount = 0
 | 
						|
 | 
						|
        # the extent service uses a different update column name and alignment
 | 
						|
        isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
 | 
						|
 | 
						|
        nslc = "{0}|{1}|{2}|{3}"
 | 
						|
        time = "|{0}|{1}"
 | 
						|
 | 
						|
        # header
 | 
						|
        fieldUnit = "#field_unit: unitless|unitless|unitless|unitless"
 | 
						|
        fieldType = "#field_type: string|string|string|string"
 | 
						|
        fieldName = "Network|Station|Location|Channel"
 | 
						|
 | 
						|
        if not ro.mergeQuality:
 | 
						|
            fieldUnit += "|unitless"
 | 
						|
            fieldType += "|string"
 | 
						|
            fieldName += "|Quality"
 | 
						|
 | 
						|
        if not ro.mergeSampleRate:
 | 
						|
            fieldUnit += " |hertz"
 | 
						|
            fieldType += " |float"
 | 
						|
            fieldName += "|SampleRate"
 | 
						|
 | 
						|
        fieldUnit += " |ISO_8601|ISO_8601"
 | 
						|
        fieldType += " |datetime|datetime"
 | 
						|
        fieldName += "|Earliest|Latest"
 | 
						|
 | 
						|
        if ro.showLatestUpdate:
 | 
						|
            fieldUnit += "|ISO_8601"
 | 
						|
            fieldType += "|datetime"
 | 
						|
            fieldName += "|Updated"
 | 
						|
 | 
						|
        if isExtentReq:
 | 
						|
            fieldUnit += "|unitless|unitless"
 | 
						|
            fieldType += "|integer|string"
 | 
						|
            fieldName += "|TimeSpans|Restriction"
 | 
						|
 | 
						|
        header = (
 | 
						|
            "#dataset: GeoCSV 2.0\n#delimiter: |\n"
 | 
						|
            f"{fieldUnit}\n{fieldType}\n{fieldName}\n"
 | 
						|
        )
 | 
						|
 | 
						|
        first = True
 | 
						|
        for line in lines:
 | 
						|
            if first:
 | 
						|
                first = False
 | 
						|
                utils.writeTS(req, header)
 | 
						|
                charCount += len(header)
 | 
						|
 | 
						|
            wid = line[0].waveformID()
 | 
						|
            e = line[1]
 | 
						|
            data = nslc.format(
 | 
						|
                wid.networkCode(),
 | 
						|
                wid.stationCode(),
 | 
						|
                wid.locationCode(),
 | 
						|
                wid.channelCode(),
 | 
						|
            )
 | 
						|
            if not ro.mergeQuality:
 | 
						|
                data += f"|{e.quality()}"
 | 
						|
            if not ro.mergeSampleRate:
 | 
						|
                data += f"|{e.sampleRate():.1f}"
 | 
						|
            data += time.format(
 | 
						|
                self._formatTime(e.start(), True), self._formatTime(e.end(), True)
 | 
						|
            )
 | 
						|
            if ro.showLatestUpdate:
 | 
						|
                data += f"|{self._formatTime(e.updated())}"
 | 
						|
            if isExtentReq:
 | 
						|
                data += f"|{e.segmentCount():d}"
 | 
						|
                data += "|RESTRICTED" if line[2] else "|OPEN"
 | 
						|
            data += "\n"
 | 
						|
 | 
						|
            utils.writeTS(req, data)
 | 
						|
            charCount += len(data)
 | 
						|
            lineCount += 1
 | 
						|
 | 
						|
        return charCount, lineCount
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeFormatJSON(self, req, lines, ro):
 | 
						|
        header = (
 | 
						|
            f'{{"created":"{self._formatTime(Time.GMT())}",'
 | 
						|
            '"version": 1.0,'
 | 
						|
            '"datasources":['
 | 
						|
        )
 | 
						|
        footer = "]}"
 | 
						|
 | 
						|
        return self._writeJSONChannels(req, header, footer, lines, ro)
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
@implementer(portal.IRealm)
 | 
						|
class FDSNAvailabilityExtentRealm:
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self, access):
 | 
						|
        self.__access = access
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def requestAvatar(self, avatarId, _mind, *interfaces):
 | 
						|
        if resource.IResource in interfaces:
 | 
						|
            user = {"mail": utils.u_str(avatarId), "blacklisted": False}
 | 
						|
            return (
 | 
						|
                resource.IResource,
 | 
						|
                FDSNAvailabilityExtent(self.__access, user),
 | 
						|
                lambda: None,
 | 
						|
            )
 | 
						|
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
@implementer(portal.IRealm)
 | 
						|
class FDSNAvailabilityExtentAuthRealm:
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self, access, userdb):
 | 
						|
        self.__access = access
 | 
						|
        self.__userdb = userdb
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def requestAvatar(self, avatarId, _mind, *interfaces):
 | 
						|
        if resource.IResource in interfaces:
 | 
						|
            user = self.__userdb.getAttributes(utils.u_str(avatarId))
 | 
						|
            return (
 | 
						|
                resource.IResource,
 | 
						|
                FDSNAvailabilityExtent(self.__access, user),
 | 
						|
                lambda: None,
 | 
						|
            )
 | 
						|
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class FDSNAvailabilityExtent(_Availability):
 | 
						|
    isLeaf = True
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _createRequestOptions():
 | 
						|
        return _AvailabilityExtentRequestOptions()
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _mergeExtents(attributeExtents):
 | 
						|
        merged = None
 | 
						|
        cloned = False
 | 
						|
 | 
						|
        # Create a copy of the extent only if more than 1 element is found. The number
 | 
						|
        # of elements is not known in advance since attributeExtents might only be an
 | 
						|
        # iterator.
 | 
						|
        for attExt in attributeExtents:
 | 
						|
            if merged is None:
 | 
						|
                merged = attExt
 | 
						|
                continue
 | 
						|
 | 
						|
            if not cloned:
 | 
						|
                merged = datamodel.DataAttributeExtent(merged)
 | 
						|
                cloned = True
 | 
						|
 | 
						|
            if attExt.start() < merged.start():
 | 
						|
                merged.setStart(attExt.start())
 | 
						|
            if attExt.end() > merged.end():
 | 
						|
                merged.setEnd(attExt.end())
 | 
						|
            if attExt.updated() > merged.updated():
 | 
						|
                merged.setUpdated(attExt.updated())
 | 
						|
            merged.setSegmentCount(merged.segmentCount() + attExt.segmentCount())
 | 
						|
 | 
						|
        return merged
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeJSONChannels(self, req, header, footer, lines, ro):
 | 
						|
        charCount = 0
 | 
						|
 | 
						|
        nslc = (
 | 
						|
            "{{"
 | 
						|
            '"network":"{0}",'
 | 
						|
            '"station":"{1}",'
 | 
						|
            '"location":"{2}",'
 | 
						|
            '"channel":"{3}"'
 | 
						|
        )
 | 
						|
        quality = ',"quality":"{0}"'
 | 
						|
        sampleRate = ',"samplerate":{0}'
 | 
						|
        time = (
 | 
						|
            ',"earliest":"{0}","latest":"{1}","timespanCount":{2}'
 | 
						|
            ',"updated":"{3}","restriction":"{4}"}}'
 | 
						|
        )
 | 
						|
 | 
						|
        utils.writeTS(req, header)
 | 
						|
        charCount += len(header)
 | 
						|
 | 
						|
        data = None
 | 
						|
        for line in lines:
 | 
						|
            wid = line[0].waveformID()
 | 
						|
            e = line[1]
 | 
						|
            data = "," if data is not None else ""
 | 
						|
            data += nslc.format(
 | 
						|
                wid.networkCode(),
 | 
						|
                wid.stationCode(),
 | 
						|
                wid.locationCode(),
 | 
						|
                wid.channelCode(),
 | 
						|
            )
 | 
						|
            if not ro.mergeQuality:
 | 
						|
                data += quality.format(e.quality())
 | 
						|
            if not ro.mergeSampleRate:
 | 
						|
                data += sampleRate.format(e.sampleRate())
 | 
						|
            data += time.format(
 | 
						|
                self._formatTime(e.start(), True),
 | 
						|
                self._formatTime(e.end(), True),
 | 
						|
                e.segmentCount(),
 | 
						|
                self._formatTime(e.updated()),
 | 
						|
                format("RESTRICTED" if line[2] else "OPEN"),
 | 
						|
            )
 | 
						|
 | 
						|
            utils.writeTS(req, data)
 | 
						|
            charCount += len(data)
 | 
						|
 | 
						|
        utils.writeTS(req, footer)
 | 
						|
        charCount += len(footer)
 | 
						|
 | 
						|
        return charCount, len(lines)
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _processRequest(self, req, ro, dac):
 | 
						|
        if req._disconnected:  # pylint: disable=W0212
 | 
						|
            return False
 | 
						|
 | 
						|
        # tuples: extent, attribute extent, restricted status
 | 
						|
        lines = []
 | 
						|
 | 
						|
        mergeAll = ro.mergeQuality and ro.mergeSampleRate
 | 
						|
        mergeNone = not ro.mergeQuality and not ro.mergeSampleRate
 | 
						|
 | 
						|
        # iterate extents
 | 
						|
        for ext, _, restricted in ro.extentIter(dac, self.user, self.access):
 | 
						|
            if req._disconnected:  # pylint: disable=W0212
 | 
						|
                return False
 | 
						|
 | 
						|
            # iterate attribute extents and merge them if requested
 | 
						|
            if mergeNone:
 | 
						|
                for attExt in ro.attributeExtentIter(ext):
 | 
						|
                    lines.append((ext, attExt, restricted))
 | 
						|
                continue
 | 
						|
 | 
						|
            if mergeAll:
 | 
						|
                attExt = self._mergeExtents(ro.attributeExtentIter(ext))
 | 
						|
                if attExt is not None:
 | 
						|
                    lines.append((ext, attExt, restricted))
 | 
						|
                continue
 | 
						|
 | 
						|
            attExtDict = {}
 | 
						|
            if ro.mergeQuality:
 | 
						|
                # key=sampleRate
 | 
						|
                for attExt in ro.attributeExtentIter(ext):
 | 
						|
                    if attExt.sampleRate() in attExtDict:
 | 
						|
                        attExtDict[attExt.sampleRate()].append(attExt)
 | 
						|
                    else:
 | 
						|
                        attExtDict[attExt.sampleRate()] = [attExt]
 | 
						|
            # ro.mergeSampleRate
 | 
						|
            else:
 | 
						|
                # key=quality
 | 
						|
                for attExt in ro.attributeExtentIter(ext):
 | 
						|
                    if attExt.quality() in attExtDict:
 | 
						|
                        attExtDict[attExt.quality()].append(attExt)
 | 
						|
                    else:
 | 
						|
                        attExtDict[attExt.quality()] = [attExt]
 | 
						|
 | 
						|
            for attExtents in attExtDict.values():
 | 
						|
                lines.append((ext, self._mergeExtents(attExtents), restricted))
 | 
						|
 | 
						|
        # Return 204 if no matching availability information was found
 | 
						|
        if not lines:
 | 
						|
            msg = "no matching availabilty information found"
 | 
						|
            self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
 | 
						|
            return True
 | 
						|
 | 
						|
        # sort lines
 | 
						|
        self._sortLines(lines, ro)
 | 
						|
 | 
						|
        # truncate lines to requested row limit
 | 
						|
        if ro.limit:
 | 
						|
            del lines[ro.limit :]
 | 
						|
 | 
						|
        charCount, extCount = self._writeLines(req, lines, ro)
 | 
						|
 | 
						|
        logging.debug(
 | 
						|
            f"{ro.service}: returned {extCount} extents (characters: {charCount})"
 | 
						|
        )
 | 
						|
        utils.accessLog(req, ro, http.OK, charCount, None)
 | 
						|
        return True
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _sortLines(lines, ro):
 | 
						|
 | 
						|
        # Sort by segment count followed by NSLC
 | 
						|
        if ro.orderBy == ro.VOrderByCount:
 | 
						|
            lines.sort(
 | 
						|
                key=lambda x: (
 | 
						|
                    x[1].segmentCount(),
 | 
						|
                    x[0].waveformID().networkCode(),
 | 
						|
                    x[0].waveformID().stationCode(),
 | 
						|
                    x[0].waveformID().locationCode(),
 | 
						|
                    x[0].waveformID().channelCode(),
 | 
						|
                    x[1].start(),
 | 
						|
                    x[1].end(),
 | 
						|
                    x[1].quality(),
 | 
						|
                    x[1].sampleRate(),
 | 
						|
                )
 | 
						|
            )
 | 
						|
        # Sort by segment count in descending order followed by NSLC
 | 
						|
        elif ro.orderBy == ro.VOrderByCountDesc:
 | 
						|
            lines.sort(
 | 
						|
                key=lambda x: (
 | 
						|
                    -x[1].segmentCount(),
 | 
						|
                    x[0].waveformID().networkCode(),
 | 
						|
                    x[0].waveformID().stationCode(),
 | 
						|
                    x[0].waveformID().locationCode(),
 | 
						|
                    x[0].waveformID().channelCode(),
 | 
						|
                    x[1].start(),
 | 
						|
                    x[1].end(),
 | 
						|
                    x[1].quality(),
 | 
						|
                    x[1].sampleRate(),
 | 
						|
                )
 | 
						|
            )
 | 
						|
        # Sort by update time followed by NSLC
 | 
						|
        elif ro.orderBy == ro.VOrderByUpdate:
 | 
						|
            lines.sort(
 | 
						|
                key=lambda x: (
 | 
						|
                    x[1].updated().seconds(),
 | 
						|
                    x[0].waveformID().networkCode(),
 | 
						|
                    x[0].waveformID().stationCode(),
 | 
						|
                    x[0].waveformID().locationCode(),
 | 
						|
                    x[0].waveformID().channelCode(),
 | 
						|
                    x[1].start(),
 | 
						|
                    x[1].end(),
 | 
						|
                    x[1].quality(),
 | 
						|
                    x[1].sampleRate(),
 | 
						|
                )
 | 
						|
            )
 | 
						|
        # Sort by update time in descending order followed by NSLC
 | 
						|
        elif ro.orderBy == ro.VOrderByUpdateDesc:
 | 
						|
            lines.sort(
 | 
						|
                key=lambda x: (
 | 
						|
                    -x[1].updated().seconds(),
 | 
						|
                    x[0].waveformID().networkCode(),
 | 
						|
                    x[0].waveformID().stationCode(),
 | 
						|
                    x[0].waveformID().locationCode(),
 | 
						|
                    x[0].waveformID().channelCode(),
 | 
						|
                    x[1].start(),
 | 
						|
                    x[1].end(),
 | 
						|
                    x[1].quality(),
 | 
						|
                    x[1].sampleRate(),
 | 
						|
                )
 | 
						|
            )
 | 
						|
        # Sort by NSLC
 | 
						|
        else:
 | 
						|
            lines.sort(
 | 
						|
                key=lambda x: (
 | 
						|
                    x[0].waveformID().networkCode(),
 | 
						|
                    x[0].waveformID().stationCode(),
 | 
						|
                    x[0].waveformID().locationCode(),
 | 
						|
                    x[0].waveformID().channelCode(),
 | 
						|
                    x[1].start(),
 | 
						|
                    x[1].end(),
 | 
						|
                    x[1].quality(),
 | 
						|
                    x[1].sampleRate(),
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
@implementer(portal.IRealm)
 | 
						|
class FDSNAvailabilityQueryRealm:
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self, access):
 | 
						|
        self.__access = access
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def requestAvatar(self, avatarId, _mind, *interfaces):
 | 
						|
        if resource.IResource in interfaces:
 | 
						|
            user = {"mail": utils.u_str(avatarId), "blacklisted": False}
 | 
						|
            return (
 | 
						|
                resource.IResource,
 | 
						|
                FDSNAvailabilityQuery(self.__access, user),
 | 
						|
                lambda: None,
 | 
						|
            )
 | 
						|
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
@implementer(portal.IRealm)
 | 
						|
class FDSNAvailabilityQueryAuthRealm:
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def __init__(self, access, userdb):
 | 
						|
        self.__access = access
 | 
						|
        self.__userdb = userdb
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def requestAvatar(self, avatarId, _mind, *interfaces):
 | 
						|
        if resource.IResource in interfaces:
 | 
						|
            user = self.__userdb.getAttributes(utils.u_str(avatarId))
 | 
						|
            return (
 | 
						|
                resource.IResource,
 | 
						|
                FDSNAvailabilityQuery(self.__access, user),
 | 
						|
                lambda: None,
 | 
						|
            )
 | 
						|
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
 | 
						|
###############################################################################
 | 
						|
class FDSNAvailabilityQuery(_Availability):
 | 
						|
    isLeaf = True
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _createRequestOptions():
 | 
						|
        return _AvailabilityQueryRequestOptions()
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _writeJSONChannels(self, req, header, footer, lines, ro):
 | 
						|
        charCount = 0
 | 
						|
        lineCount = 0
 | 
						|
 | 
						|
        nslc = (
 | 
						|
            "{{"
 | 
						|
            '"network":"{0}",'
 | 
						|
            '"station":"{1}",'
 | 
						|
            '"location":"{2}",'
 | 
						|
            '"channel":"{3}"'
 | 
						|
        )
 | 
						|
        quality = ',"quality":"{0}"'
 | 
						|
        sampleRate = ',"samplerate":{0}'
 | 
						|
        updated = ',"updated":"{0}"'
 | 
						|
        timeSpans = ',"timespans":[['
 | 
						|
        seg = '"{0}","{1}"]'
 | 
						|
 | 
						|
        class SegGroup:
 | 
						|
            def __init__(self, segments, updated):
 | 
						|
                self.segments = segments
 | 
						|
                self.updated = updated
 | 
						|
 | 
						|
        def writeSegments(segments):
 | 
						|
            first = True
 | 
						|
            charCount = 0
 | 
						|
 | 
						|
            for s in segments:
 | 
						|
                if first:
 | 
						|
                    first = False
 | 
						|
                    data = timeSpans
 | 
						|
                else:
 | 
						|
                    data = ",["
 | 
						|
 | 
						|
                data += seg.format(
 | 
						|
                    self._formatTime(s.start(), True), self._formatTime(s.end(), True)
 | 
						|
                )
 | 
						|
                utils.writeTS(req, data)
 | 
						|
                charCount += len(data)
 | 
						|
 | 
						|
            return charCount
 | 
						|
 | 
						|
        prevExt = None
 | 
						|
 | 
						|
        # merge of quality and sample rate: all timespans of one stream are
 | 
						|
        # grouped into one channel
 | 
						|
        if ro.mergeQuality and ro.mergeSampleRate:
 | 
						|
            lastUpdate = None
 | 
						|
            segments = None
 | 
						|
            for ext, attExt, _restricted in lines:
 | 
						|
                lineCount += 1
 | 
						|
 | 
						|
                if ext is prevExt:
 | 
						|
                    lastUpdate = max(lastUpdate, attExt.updated())
 | 
						|
                    segments.append(attExt)
 | 
						|
                    continue
 | 
						|
 | 
						|
                if prevExt is not None:
 | 
						|
                    if charCount == 0:
 | 
						|
                        utils.writeTS(req, header)
 | 
						|
                        charCount += len(header)
 | 
						|
                        data = ""
 | 
						|
                    else:
 | 
						|
                        data = "]},"
 | 
						|
 | 
						|
                    wid = prevExt.waveformID()
 | 
						|
                    data += nslc.format(
 | 
						|
                        wid.networkCode(),
 | 
						|
                        wid.stationCode(),
 | 
						|
                        wid.locationCode(),
 | 
						|
                        wid.channelCode(),
 | 
						|
                    )
 | 
						|
                    if ro.showLatestUpdate:
 | 
						|
                        data += updated.format(self._formatTime(lastUpdate))
 | 
						|
                    utils.writeTS(req, data)
 | 
						|
                    charCount += len(data)
 | 
						|
                    charCount += writeSegments(segments)
 | 
						|
 | 
						|
                prevExt = ext
 | 
						|
                lastUpdate = attExt.updated()
 | 
						|
                segments = [attExt]
 | 
						|
 | 
						|
            # handle last extent
 | 
						|
            if prevExt is not None:
 | 
						|
                if charCount == 0:
 | 
						|
                    utils.writeTS(req, header)
 | 
						|
                    charCount += len(header)
 | 
						|
                    data = ""
 | 
						|
                else:
 | 
						|
                    data = "]},"
 | 
						|
 | 
						|
                wid = prevExt.waveformID()
 | 
						|
                data += nslc.format(
 | 
						|
                    wid.networkCode(),
 | 
						|
                    wid.stationCode(),
 | 
						|
                    wid.locationCode(),
 | 
						|
                    wid.channelCode(),
 | 
						|
                )
 | 
						|
                if ro.showLatestUpdate:
 | 
						|
                    data += updated.format(self._formatTime(lastUpdate))
 | 
						|
                utils.writeTS(req, data)
 | 
						|
                charCount += len(data)
 | 
						|
                charCount += writeSegments(segments)
 | 
						|
 | 
						|
                data = "]}"
 | 
						|
                utils.writeTS(req, data)
 | 
						|
                charCount += len(data)
 | 
						|
                utils.writeTS(req, footer)
 | 
						|
                charCount += len(footer)
 | 
						|
 | 
						|
        # merge of quality: all timespans of one stream are grouped by sample rate
 | 
						|
        elif ro.mergeQuality:
 | 
						|
            segGroups = OrderedDict()
 | 
						|
            for ext, attExt, _restricted in lines:
 | 
						|
                lineCount += 1
 | 
						|
 | 
						|
                if ext is prevExt:
 | 
						|
                    if attExt.sampleRate() in segGroups:
 | 
						|
                        segGroup = segGroups[attExt.sampleRate()]
 | 
						|
                        segGroup.updated = max(segGroup.updated, attExt.updated())
 | 
						|
                        segGroup.segments.append(attExt)
 | 
						|
                        continue
 | 
						|
 | 
						|
                else:
 | 
						|
                    if prevExt is not None:
 | 
						|
                        wid = prevExt.waveformID()
 | 
						|
                        for sr, sg in segGroups.items():
 | 
						|
                            if req._disconnected:  # pylint: disable=W0212
 | 
						|
                                return False
 | 
						|
 | 
						|
                            if charCount == 0:
 | 
						|
                                utils.writeTS(req, header)
 | 
						|
                                charCount += len(header)
 | 
						|
                                data = ""
 | 
						|
                            else:
 | 
						|
                                data = "]},"
 | 
						|
 | 
						|
                            data += nslc.format(
 | 
						|
                                wid.networkCode(),
 | 
						|
                                wid.stationCode(),
 | 
						|
                                wid.locationCode(),
 | 
						|
                                wid.channelCode(),
 | 
						|
                            )
 | 
						|
                            data += sampleRate.format(sr)
 | 
						|
                            if ro.showLatestUpdate:
 | 
						|
                                data += updated.format(self._formatTime(sg.updated))
 | 
						|
                            utils.writeTS(req, data)
 | 
						|
                            charCount += len(data)
 | 
						|
                            charCount += writeSegments(sg.segments)
 | 
						|
 | 
						|
                    prevExt = ext
 | 
						|
                    segGroups = OrderedDict()
 | 
						|
 | 
						|
                segGroups[attExt.sampleRate()] = SegGroup([attExt], attExt.updated())
 | 
						|
 | 
						|
            # handle last extent
 | 
						|
            if prevExt is not None:
 | 
						|
                wid = prevExt.waveformID()
 | 
						|
                for sr, sg in segGroups.items():
 | 
						|
                    if req._disconnected:  # pylint: disable=W0212
 | 
						|
                        return False
 | 
						|
 | 
						|
                    if charCount == 0:
 | 
						|
                        utils.writeTS(req, header)
 | 
						|
                        charCount += len(header)
 | 
						|
                        data = ""
 | 
						|
                    else:
 | 
						|
                        data = "]},"
 | 
						|
 | 
						|
                    data += nslc.format(
 | 
						|
                        wid.networkCode(),
 | 
						|
                        wid.stationCode(),
 | 
						|
                        wid.locationCode(),
 | 
						|
                        wid.channelCode(),
 | 
						|
                    )
 | 
						|
                    data += sampleRate.format(sr)
 | 
						|
                    if ro.showLatestUpdate:
 | 
						|
                        data += updated.format(self._formatTime(sg.updated))
 | 
						|
                    utils.writeTS(req, data)
 | 
						|
                    charCount += len(data)
 | 
						|
                    charCount += writeSegments(sg.segments)
 | 
						|
 | 
						|
                data = "]}"
 | 
						|
                utils.writeTS(req, data)
 | 
						|
                charCount += len(data)
 | 
						|
                utils.writeTS(req, footer)
 | 
						|
                charCount += len(footer)
 | 
						|
 | 
						|
        # merge of sample rate: all timespans of one stream are grouped by
 | 
						|
        # quality
 | 
						|
        elif ro.mergeSampleRate:
 | 
						|
            segGroups = OrderedDict()
 | 
						|
            for ext, attExt, _restricted in lines:
 | 
						|
                lineCount += 1
 | 
						|
 | 
						|
                if ext is prevExt:
 | 
						|
                    if attExt.quality() in segGroups:
 | 
						|
                        segGroup = segGroups[attExt.quality()]
 | 
						|
                        segGroup.updated = max(segGroup.updated, attExt.updated())
 | 
						|
                        segGroup.segments.append(attExt)
 | 
						|
                        continue
 | 
						|
 | 
						|
                else:
 | 
						|
                    if prevExt is not None:
 | 
						|
                        wid = prevExt.waveformID()
 | 
						|
                        for q, sg in segGroups.items():
 | 
						|
                            if req._disconnected:  # pylint: disable=W0212
 | 
						|
                                return False
 | 
						|
 | 
						|
                            if charCount == 0:
 | 
						|
                                utils.writeTS(req, header)
 | 
						|
                                charCount += len(header)
 | 
						|
                                data = ""
 | 
						|
                            else:
 | 
						|
                                data = "]},"
 | 
						|
 | 
						|
                            data += nslc.format(
 | 
						|
                                wid.networkCode(),
 | 
						|
                                wid.stationCode(),
 | 
						|
                                wid.locationCode(),
 | 
						|
                                wid.channelCode(),
 | 
						|
                            )
 | 
						|
                            data += quality.format(q)
 | 
						|
                            if ro.showLatestUpdate:
 | 
						|
                                data += updated.format(self._formatTime(sg.updated))
 | 
						|
                            utils.writeTS(req, data)
 | 
						|
                            charCount += len(data)
 | 
						|
                            charCount += writeSegments(sg.segments)
 | 
						|
 | 
						|
                    prevExt = ext
 | 
						|
                    segGroups = OrderedDict()
 | 
						|
 | 
						|
                segGroups[attExt.quality()] = SegGroup([attExt], attExt.updated())
 | 
						|
 | 
						|
            # handle last extent
 | 
						|
            if prevExt is not None:
 | 
						|
                wid = prevExt.waveformID()
 | 
						|
                for q, sg in segGroups.items():
 | 
						|
                    if req._disconnected:  # pylint: disable=W0212
 | 
						|
                        return False
 | 
						|
 | 
						|
                    if charCount == 0:
 | 
						|
                        utils.writeTS(req, header)
 | 
						|
                        charCount += len(header)
 | 
						|
                        data = ""
 | 
						|
                    else:
 | 
						|
                        data = "]},"
 | 
						|
 | 
						|
                    data += nslc.format(
 | 
						|
                        wid.networkCode(),
 | 
						|
                        wid.stationCode(),
 | 
						|
                        wid.locationCode(),
 | 
						|
                        wid.channelCode(),
 | 
						|
                    )
 | 
						|
                    data += quality.format(q)
 | 
						|
                    if ro.showLatestUpdate:
 | 
						|
                        data += updated.format(self._formatTime(sg.updated))
 | 
						|
                    utils.writeTS(req, data)
 | 
						|
                    charCount += len(data)
 | 
						|
                    charCount += writeSegments(sg.segments)
 | 
						|
 | 
						|
                data = "]}"
 | 
						|
                utils.writeTS(req, data)
 | 
						|
                charCount += len(data)
 | 
						|
                utils.writeTS(req, footer)
 | 
						|
                charCount += len(footer)
 | 
						|
 | 
						|
        # no merge: all timespans of one stream are grouped by tuple of quality and
 | 
						|
        # sampleRate
 | 
						|
        else:
 | 
						|
            segGroups = OrderedDict()
 | 
						|
            for ext, attExt, _restricted in lines:
 | 
						|
                lineCount += 1
 | 
						|
                t = (attExt.quality(), attExt.sampleRate())
 | 
						|
 | 
						|
                if ext is prevExt:
 | 
						|
                    if t in segGroups:
 | 
						|
                        segGroup = segGroups[t]
 | 
						|
                        segGroup.updated = max(segGroup.updated, attExt.updated())
 | 
						|
                        segGroup.segments.append(attExt)
 | 
						|
                        continue
 | 
						|
 | 
						|
                else:
 | 
						|
                    if prevExt is not None:
 | 
						|
                        wid = prevExt.waveformID()
 | 
						|
                        for (q, sr), sg in segGroups.items():
 | 
						|
                            if req._disconnected:  # pylint: disable=W0212
 | 
						|
                                return False
 | 
						|
 | 
						|
                            if charCount == 0:
 | 
						|
                                utils.writeTS(req, header)
 | 
						|
                                charCount += len(header)
 | 
						|
                                data = ""
 | 
						|
                            else:
 | 
						|
                                data = "]},"
 | 
						|
 | 
						|
                            data += nslc.format(
 | 
						|
                                wid.networkCode(),
 | 
						|
                                wid.stationCode(),
 | 
						|
                                wid.locationCode(),
 | 
						|
                                wid.channelCode(),
 | 
						|
                            )
 | 
						|
                            data += quality.format(q)
 | 
						|
                            data += sampleRate.format(sr)
 | 
						|
                            if ro.showLatestUpdate:
 | 
						|
                                data += updated.format(self._formatTime(sg.updated))
 | 
						|
                            utils.writeTS(req, data)
 | 
						|
                            charCount += len(data)
 | 
						|
                            charCount += writeSegments(sg.segments)
 | 
						|
 | 
						|
                    prevExt = ext
 | 
						|
                    segGroups = OrderedDict()
 | 
						|
 | 
						|
                segGroups[t] = SegGroup([attExt], attExt.updated())
 | 
						|
 | 
						|
            # handle last extent
 | 
						|
            if prevExt is not None:
 | 
						|
                wid = prevExt.waveformID()
 | 
						|
                for (q, sr), sg in segGroups.items():
 | 
						|
                    if req._disconnected:  # pylint: disable=W0212
 | 
						|
                        return False
 | 
						|
 | 
						|
                    if charCount == 0:
 | 
						|
                        utils.writeTS(req, header)
 | 
						|
                        charCount += len(header)
 | 
						|
                        data = ""
 | 
						|
                    else:
 | 
						|
                        data = "]},"
 | 
						|
 | 
						|
                    data += nslc.format(
 | 
						|
                        wid.networkCode(),
 | 
						|
                        wid.stationCode(),
 | 
						|
                        wid.locationCode(),
 | 
						|
                        wid.channelCode(),
 | 
						|
                    )
 | 
						|
                    data += quality.format(q)
 | 
						|
                    data += sampleRate.format(sr)
 | 
						|
                    if ro.showLatestUpdate:
 | 
						|
                        data += updated.format(self._formatTime(sg.updated))
 | 
						|
                    utils.writeTS(req, data)
 | 
						|
                    charCount += len(data)
 | 
						|
                    charCount += writeSegments(sg.segments)
 | 
						|
 | 
						|
                data = "]}"
 | 
						|
                utils.writeTS(req, data)
 | 
						|
                charCount += len(data)
 | 
						|
                utils.writeTS(req, footer)
 | 
						|
                charCount += len(footer)
 | 
						|
 | 
						|
        return charCount, lineCount
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    def _processRequest(self, req, ro, dac):
 | 
						|
        if req._disconnected:  # pylint: disable=W0212
 | 
						|
            return False
 | 
						|
 | 
						|
        # tuples: wid, segment, restricted status
 | 
						|
        lines = []
 | 
						|
 | 
						|
        charCount = 0
 | 
						|
 | 
						|
        # iterate extents and create IN clauses of parent_oids in bunches
 | 
						|
        # of 1000 because the query size is limited
 | 
						|
        parentOIDs, idList, tooLarge = [], [], []
 | 
						|
        i = 0
 | 
						|
        for ext, objID, _ in ro.extentIter(dac, self.user, self.access):
 | 
						|
            if req._disconnected:  # pylint: disable=W0212
 | 
						|
                return False
 | 
						|
 | 
						|
            if ro.excludeTooLarge:
 | 
						|
                if ext.segmentOverflow():
 | 
						|
                    continue
 | 
						|
            elif ext.segmentOverflow():
 | 
						|
                tooLarge.append(ext)
 | 
						|
                continue
 | 
						|
            elif tooLarge:
 | 
						|
                continue
 | 
						|
 | 
						|
            if i < 1000:
 | 
						|
                idList.append(objID)
 | 
						|
                i += 1
 | 
						|
            else:
 | 
						|
                parentOIDs.append(idList)
 | 
						|
                idList = [objID]
 | 
						|
                i = 1
 | 
						|
 | 
						|
        if not ro.excludeTooLarge and tooLarge:
 | 
						|
            extents = ", ".join(
 | 
						|
                f"{e.waveformID().networkCode()}.{e.waveformID().stationCode()}."
 | 
						|
                f"{e.waveformID().locationCode()}.{e.waveformID().channelCode()}"
 | 
						|
                for e in tooLarge
 | 
						|
            )
 | 
						|
 | 
						|
            msg = (
 | 
						|
                "Unable to process request due to database limitations. Some "
 | 
						|
                "selections have too many segments to process. Rejected extents: "
 | 
						|
                f"{{{extents}}}. This limitation may be resolved in a future version "
 | 
						|
                "of this webservice."
 | 
						|
            )
 | 
						|
            self.writeErrorPage(req, http.REQUEST_ENTITY_TOO_LARGE, msg, ro)
 | 
						|
            return False
 | 
						|
 | 
						|
        if len(idList) > 0:
 | 
						|
            parentOIDs.append(idList)
 | 
						|
        else:
 | 
						|
            msg = "no matching availabilty information found"
 | 
						|
            self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
 | 
						|
            return False
 | 
						|
 | 
						|
        db = io.DatabaseInterface.Open(Application.Instance().databaseURI())
 | 
						|
        if db is None:
 | 
						|
            msg = "could not connect to database"
 | 
						|
            return self.renderErrorPage(req, http.SERVICE_UNAVAILABLE, msg, ro)
 | 
						|
 | 
						|
        lines = self._lineIter(db, parentOIDs, req, ro, dac.extentsOID())
 | 
						|
 | 
						|
        charCount, segCount = self._writeLines(req, lines, ro)
 | 
						|
 | 
						|
        # Return 204 if no matching availability information was found
 | 
						|
        if segCount <= 0:
 | 
						|
            msg = "no matching availabilty information found"
 | 
						|
            self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
 | 
						|
            return True
 | 
						|
 | 
						|
        logging.debug(
 | 
						|
            f"{ro.service}: returned {segCount} segments (characters: {charCount})"
 | 
						|
        )
 | 
						|
        utils.accessLog(req, ro, http.OK, charCount, None)
 | 
						|
 | 
						|
        return True
 | 
						|
 | 
						|
    # --------------------------------------------------------------------------
 | 
						|
    @staticmethod
 | 
						|
    def _lineIter(db, parentOIDs, req, ro, oIDs):
 | 
						|
        def _T(name):
 | 
						|
            return db.convertColumnName(name)
 | 
						|
 | 
						|
        dba = datamodel.DatabaseArchive(db)
 | 
						|
 | 
						|
        for idList in parentOIDs:
 | 
						|
            if req._disconnected:  # pylint: disable=W0212
 | 
						|
                return
 | 
						|
 | 
						|
            # build SQL query
 | 
						|
            idStr = ",".join(str(x) for x in idList)
 | 
						|
            q = f"SELECT * from DataSegment WHERE _parent_oid IN ({idStr}) "
 | 
						|
            if ro.time:
 | 
						|
                if ro.time.start is not None:
 | 
						|
                    if ro.time.start.microseconds() == 0:
 | 
						|
                        q += f"AND {_T('end')} >= '{db.timeToString(ro.time.start)}' "
 | 
						|
                    else:
 | 
						|
                        startTimeStr = db.timeToString(ro.time.start)
 | 
						|
                        q += (
 | 
						|
                            f"AND ({_T('end')} > '{startTimeStr}' OR ("
 | 
						|
                            f"{_T('end')} = '{startTimeStr}' AND "
 | 
						|
                            f"end_ms >= {ro.time.start.microseconds()})) "
 | 
						|
                        )
 | 
						|
                if ro.time.end is not None:
 | 
						|
                    if ro.time.end.microseconds() == 0:
 | 
						|
                        q += f"AND {_T('start')} < '{db.timeToString(ro.time.end)}' "
 | 
						|
                    else:
 | 
						|
                        endTimeStr = db.timeToString(ro.time.end)
 | 
						|
                        q += (
 | 
						|
                            f"AND ({_T('start')} < '{endTimeStr}' OR ("
 | 
						|
                            f"{_T('start')} = '{endTimeStr}' AND "
 | 
						|
                            f"start_ms < {ro.time.end.microseconds()})) "
 | 
						|
                        )
 | 
						|
            if ro.quality:
 | 
						|
                qualities = "', '".join(ro.quality)
 | 
						|
                q += f"AND {_T('quality')} IN ('{qualities}') "
 | 
						|
            q += f"ORDER BY _parent_oid, {_T('start')}, {_T('start_ms')}"
 | 
						|
 | 
						|
            segIt = dba.getObjectIterator(q, datamodel.DataSegment.TypeInfo())
 | 
						|
            if segIt is None or not segIt.valid():
 | 
						|
                return
 | 
						|
 | 
						|
            # Iterate and optionally merge segments.
 | 
						|
            # A segment will be yielded if
 | 
						|
            # - the extent changed
 | 
						|
            # - quality changed and merging of quality was not requested
 | 
						|
            # - sample rate changed and merging of sample rate was not
 | 
						|
            #   requested
 | 
						|
            # - an overlap was detected and merging of overlaps was not
 | 
						|
            #   requested
 | 
						|
            # - an gap was detected and the gaps exceeds the mergeGaps
 | 
						|
            #   threshold
 | 
						|
            seg = None
 | 
						|
            ext = None
 | 
						|
            lines = 0
 | 
						|
            jitter = 0.0
 | 
						|
            while (
 | 
						|
                not req._disconnected  # pylint: disable=W0212
 | 
						|
                and (seg is None or segIt.next())
 | 
						|
                and (not ro.limit or lines < ro.limit)
 | 
						|
            ):
 | 
						|
                s = datamodel.DataSegment.Cast(segIt.get())
 | 
						|
                if s is None:
 | 
						|
                    break
 | 
						|
 | 
						|
                # get extent for current segment
 | 
						|
                try:
 | 
						|
                    e, restricted = oIDs[segIt.parentOid()]
 | 
						|
                except KeyError:
 | 
						|
                    logging.warning(f"parent object id not found: {segIt.parentOid()}")
 | 
						|
                    continue
 | 
						|
 | 
						|
                # first segment, no merge test required
 | 
						|
                if seg is None:
 | 
						|
                    seg = s
 | 
						|
                    ext = e
 | 
						|
                    jitter = 1 / (2 * s.sampleRate())
 | 
						|
                    continue
 | 
						|
 | 
						|
                # merge test
 | 
						|
                diff = float(s.start() - seg.end())
 | 
						|
                mQuality = ro.mergeQuality or s.quality() == seg.quality()
 | 
						|
                mSampleRate = ro.mergeSampleRate or s.sampleRate() == seg.sampleRate()
 | 
						|
                mGaps = (ro.mergeGaps is None and diff <= jitter) or (
 | 
						|
                    ro.mergeGaps is not None and diff <= ro.mergeGaps
 | 
						|
                )
 | 
						|
                mOverlap = -diff <= jitter or ro.mergeOverlap
 | 
						|
                if e is ext and mQuality and mSampleRate and mGaps and mOverlap:
 | 
						|
                    seg.setEnd(s.end())
 | 
						|
                    if s.updated() > seg.updated():
 | 
						|
                        seg.setUpdated(s.updated())
 | 
						|
                    continue
 | 
						|
 | 
						|
                # merge was not possible, yield previous segment
 | 
						|
                yield (ext, seg, restricted)
 | 
						|
                lines += 1
 | 
						|
                seg = s
 | 
						|
                ext = e
 | 
						|
                if seg.sampleRate() != s.sampleRate():
 | 
						|
                    jitter = 1 / (2 * s.sampleRate())
 | 
						|
 | 
						|
            if seg is not None and (not ro.limit or lines < ro.limit):
 | 
						|
                yield (ext, seg, restricted)
 | 
						|
 | 
						|
            # close database iterator if iteration was stopped because of
 | 
						|
            # row limit
 | 
						|
            return
 | 
						|
 | 
						|
 | 
						|
# vim: ts=4 et tw=79
 |