Files
2025/lib/python/seiscomp/fdsnws/availability.py

1443 lines
51 KiB
Python

###############################################################################
# Copyright (C) 2013-2014 by gempa GmbH
#
# FDSNStation -- Implements the fdsnws-availability Web service, see
# http://www.fdsn.org/webservices/
#
# Feature notes:
# - additional request parameters:
# - excludetoolarge: boolean, default: true
#
# Author: Stephan Herrnkind
# Email: herrnkind@gempa.de
###############################################################################
from functools import cmp_to_key
from collections import OrderedDict
from twisted.cred import portal
from twisted.internet.threads import deferToThread
from twisted.web import http, resource, server
from zope.interface import implementer
from seiscomp import datamodel, io, logging
from seiscomp.client import Application
from seiscomp.core import Time
from .http import BaseResource
from .request import RequestOptions
from . import utils
DBMaxUInt = 18446744073709551615 # 2^64 - 1
VERSION = "1.0.3"
###############################################################################
class _AvailabilityRequestOptions(RequestOptions):
# merge options
VMergeSampleRate = "samplerate"
VMergeQuality = "quality"
VMergeOverlap = "overlap"
VMerge = [VMergeSampleRate, VMergeQuality] # overlap option only available
# in query method
# orderby options
VOrderByNSLC = "nslc_time_quality_samplerate"
VOrderByCount = "timespancount"
VOrderByCountDesc = "timespancount_desc"
VOrderByUpdate = "latestupdate"
VOrderByUpdateDesc = "latestupdate_desc"
VOrderBy = [
VOrderByNSLC,
VOrderByCount,
VOrderByCountDesc,
VOrderByUpdate,
VOrderByUpdateDesc,
]
# format options
VFormatText = "text"
VFormatGeoCSV = "geocsv"
VFormatJSON = "json"
VFormatRequest = "request"
OutputFormats = [VFormatText, VFormatGeoCSV, VFormatJSON, VFormatRequest]
# request parameters
PQuality = ["quality"]
PMerge = ["merge"]
POrderBy = ["orderby"]
PLimit = ["limit"]
PIncludeRestricted = ["includerestricted"]
POSTParams = (
RequestOptions.POSTParams
+ PQuality
+ PMerge
+ POrderBy
+ PLimit
+ PIncludeRestricted
)
GETParams = RequestOptions.GETParams + POSTParams
# --------------------------------------------------------------------------
def __init__(self):
super().__init__()
self.service = "availability-base"
self.quality = None
self.mergeSampleRate = None
self.mergeQuality = None
self.mergeOverlap = None
self.orderBy = self.VOrderBy[0]
self.limit = None
self.includeRestricted = None
self.showLatestUpdate = None
# --------------------------------------------------------------------------
def parse(self):
self.parseTime()
self.parseChannel()
self.parseOutput()
# quality: D, M, Q, R, * (optional)
for v in self.getListValues(self.PQuality):
if len(v) == 1:
if v[0] == "*":
self.quality = None
break
if v[0].isupper():
if self.quality is None:
self.quality = [v]
else:
self.quality.append(v)
continue
self.raiseValueError(self.PQuality[0])
# merge (optional)
for v in self.getListValues(self.PMerge, True):
if v not in self.VMerge:
self.raiseValueError(self.PMerge[0])
if v == self.VMergeSampleRate:
self.mergeSampleRate = True
elif v == self.VMergeQuality:
self.mergeQuality = True
elif v == self.VMergeOverlap:
self.mergeOverlap = True
# orderby (optional)
key, value = self.getFirstValue(self.POrderBy)
if value is not None:
if value in self.VOrderBy:
self.orderBy = value
else:
self.raiseValueError(key)
# limit (optional)
self.limit = self.parseInt(self.PLimit, 1, DBMaxUInt)
# includeRestricted (optional)
self.includeRestricted = self.parseBool(self.PIncludeRestricted)
# --------------------------------------------------------------------------
def extentIter(self, dac, user=None, access=None):
# tupel: extent, oid, restricted
for e in dac.extentsSorted():
ext = e[0]
restricted = e[2]
wid = ext.waveformID()
net = wid.networkCode()
sta = wid.stationCode()
loc = wid.locationCode()
cha = wid.channelCode()
if restricted:
if not user:
continue
if access:
startTime = ext.start()
endTime = ext.end()
if self.time:
if self.time.start() and self.time.start() > startTime:
startTime = self.time.start()
if self.time.end() and self.time.end() < ext.end():
endTime = self.time.end()
if not access.authorize(
user, net, sta, loc, cha, startTime, endTime
):
continue
for ro in self.streams:
if ro.channel:
if (
not ro.channel.matchNet(net)
or not ro.channel.matchSta(sta)
or not ro.channel.matchLoc(loc)
or not ro.channel.matchCha(cha)
):
continue
if ro.time and not ro.time.match(ext.start(), ext.end()):
continue
if not self.includeRestricted and restricted:
continue
yield e
###############################################################################
class _AvailabilityExtentRequestOptions(_AvailabilityRequestOptions):
# --------------------------------------------------------------------------
def __init__(self):
super().__init__()
self.service = "availability-extent"
self.showLatestUpdate = True
# --------------------------------------------------------------------------
def attributeExtentIter(self, ext):
for i in range(ext.dataAttributeExtentCount()):
attExt = ext.dataAttributeExtent(i)
if self.time and not self.time.match(attExt.start(), attExt.end()):
continue
if self.quality and attExt.quality() not in self.quality:
continue
yield attExt
###############################################################################
class _AvailabilityQueryRequestOptions(_AvailabilityRequestOptions):
# additional merge options
VMerge = _AvailabilityRequestOptions.VMerge + [
_AvailabilityRequestOptions.VMergeOverlap
]
# show options
VShowLatestUpdate = "latestupdate"
VShow = [VShowLatestUpdate]
# additional query specific request parameters
PMergeGaps = ["mergegaps"]
PShow = ["show"]
PExcludeTooLarge = ["excludetoolarge"]
POSTParams = (
_AvailabilityRequestOptions.POSTParams + PMergeGaps + PShow + PExcludeTooLarge
)
GETParams = (
_AvailabilityRequestOptions.GETParams + PMergeGaps + PShow + PExcludeTooLarge
)
# --------------------------------------------------------------------------
def __init__(self):
super().__init__()
self.service = "availability-query"
self.orderBy = None
self.mergeGaps = None
self.excludeTooLarge = None
# --------------------------------------------------------------------------
def parse(self):
_AvailabilityRequestOptions.parse(self)
# merge gaps threshold (optional)
self.mergeGaps = self.parseFloat(self.PMergeGaps, 0)
# show (optional)
for v in self.getListValues(self.PShow, True):
if v not in self.VShow:
self.raiseValueError(v)
if v == self.VShowLatestUpdate:
self.showLatestUpdate = True
# exclude to large (optional)
self.excludeTooLarge = self.parseBool(self.PExcludeTooLarge)
if self.excludeTooLarge is None:
self.excludeTooLarge = True
if not self.channel:
raise ValueError("Request contains no selections")
if self.orderBy:
raise ValueError("orderby not supported for query request")
###############################################################################
class _Availability(BaseResource):
isLeaf = True
# --------------------------------------------------------------------------
@staticmethod
def _createRequestOptions():
pass
# --------------------------------------------------------------------------
def __init__(self, access=None, user=None):
super().__init__(VERSION)
self.access = access
self.user = user
# --------------------------------------------------------------------------
def render_OPTIONS(self, req):
req.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
req.setHeader(
"Access-Control-Allow-Headers",
"Accept, Content-Type, X-Requested-With, Origin",
)
req.setHeader("Content-Type", "text/plain; charset=utf-8")
return ""
# --------------------------------------------------------------------------
def render_GET(self, req):
# Parse and validate GET parameters
ro = self._createRequestOptions()
try:
ro.parseGET(req.args)
ro.parse()
# the GET operation supports exactly one stream filter
ro.streams.append(ro)
except ValueError as e:
logging.warning(str(e))
return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
return self._prepareRequest(req, ro)
# --------------------------------------------------------------------------
def render_POST(self, req):
# Parse and validate POST parameters
ro = self._createRequestOptions()
try:
ro.parsePOST(req.content)
ro.parse()
except ValueError as e:
logging.warning(str(e))
return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
return self._prepareRequest(req, ro)
# --------------------------------------------------------------------------
def _processRequest(self, req, ro, dac):
pass
# --------------------------------------------------------------------------
def _writeJSONChannels(self, req, header, footer, lines, ro):
pass
# --------------------------------------------------------------------------
def _prepareRequest(self, req, ro):
dac = Application.Instance().getDACache()
if ro.format == ro.VFormatJSON:
contentType = "application/json; charset=utf-8"
extension = "json"
elif ro.format == ro.VFormatGeoCSV:
contentType = "text/csv; charset=utf-8"
extension = "csv"
else:
contentType = "text/plain; charset=utf-8"
extension = "txt"
req.setHeader("Content-Type", contentType)
req.setHeader(
"Content-Disposition",
f"inline; filename=fdsnws-ext-availability_{Time.GMT().iso()}.{extension}",
)
d = deferToThread(self._processRequest, req, ro, dac)
req.notifyFinish().addErrback(utils.onCancel, d)
d.addBoth(utils.onFinish, req)
# The request is handled by the deferred object
return server.NOT_DONE_YET
# --------------------------------------------------------------------------
@staticmethod
def _formatTime(time, ms=False):
if ms:
return f"{time.toString('%FT%T')}.{time.microseconds():06d}Z"
return time.toString("%FT%TZ")
# --------------------------------------------------------------------------
def _writeLines(self, req, lines, ro):
if ro.format == ro.VFormatText:
return self._writeFormatText(req, lines, ro)
if ro.format == ro.VFormatGeoCSV:
return self._writeFormatGeoCSV(req, lines, ro)
if ro.format == ro.VFormatJSON:
return self._writeFormatJSON(req, lines, ro)
if ro.format == ro.VFormatRequest:
return self._writeFormatRequest(req, lines, ro)
raise ValueError(f"unknown reponse format: {ro.format}")
# --------------------------------------------------------------------------
def _writeFormatText(self, req, lines, ro):
charCount = 0
lineCount = 0
# the extent service uses a different update column name and alignment
isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
nslc = "{0: <2} {1: <5} {2: <2} {3: <3}"
quality = " {0: <1}"
sampleRate = " {0: >11}"
sampleRateF = " {0: >11.1f}"
time = " {0: <27} {1: <27}"
updated = " {0: <20}"
timeSpans = " {0: >10}"
restriction = " {0: <11}"
header = nslc.format("#N", "S", "L", "C")
if not ro.mergeQuality:
header += quality.format("Q")
if not ro.mergeSampleRate:
header += sampleRate.format("SampleRate")
header += time.format("Earliest", "Latest")
if ro.showLatestUpdate:
header += updated.format("Updated")
if isExtentReq:
header += timeSpans.format("TimeSpans")
header += restriction.format("Restriction")
header += "\n"
first = True
for line in lines:
if first:
first = False
utils.writeTS(req, header)
charCount += len(header)
wid = line[0].waveformID()
e = line[1]
loc = wid.locationCode() if wid.locationCode() else "--"
data = nslc.format(
wid.networkCode(), wid.stationCode(), loc, wid.channelCode()
)
if not ro.mergeQuality:
data += quality.format(e.quality())
if not ro.mergeSampleRate:
data += sampleRateF.format(e.sampleRate())
data += time.format(
self._formatTime(e.start(), True), self._formatTime(e.end(), True)
)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(e.updated()))
if isExtentReq:
data += timeSpans.format(e.segmentCount())
data += restriction.format("RESTRICTED" if line[2] else "OPEN")
data += "\n"
utils.writeTS(req, data)
charCount += len(data)
lineCount += 1
return charCount, lineCount
# --------------------------------------------------------------------------
def _writeFormatRequest(self, req, lines, ro):
charCount = 0
lineCount = 0
for line in lines:
wid = line[0].waveformID()
e = line[1]
loc = wid.locationCode() if wid.locationCode() else "--"
start = e.start()
end = e.end()
# truncate start and end time to requested time frame
if ro.time:
if ro.time.start and ro.time.start > start:
start = ro.time.start
if ro.time.end and ro.time.end < end:
end = ro.time.end
data = (
f"{wid.networkCode()} {wid.stationCode()} {loc} {wid.channelCode()} "
f"{self._formatTime(start, True)} {self._formatTime(end, True)}\n"
)
utils.writeTS(req, data)
charCount += len(data)
lineCount += 1
return charCount, lineCount
# --------------------------------------------------------------------------
def _writeFormatGeoCSV(self, req, lines, ro):
charCount = 0
lineCount = 0
# the extent service uses a different update column name and alignment
isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
nslc = "{0}|{1}|{2}|{3}"
time = "|{0}|{1}"
# header
fieldUnit = "#field_unit: unitless|unitless|unitless|unitless"
fieldType = "#field_type: string|string|string|string"
fieldName = "Network|Station|Location|Channel"
if not ro.mergeQuality:
fieldUnit += "|unitless"
fieldType += "|string"
fieldName += "|Quality"
if not ro.mergeSampleRate:
fieldUnit += " |hertz"
fieldType += " |float"
fieldName += "|SampleRate"
fieldUnit += " |ISO_8601|ISO_8601"
fieldType += " |datetime|datetime"
fieldName += "|Earliest|Latest"
if ro.showLatestUpdate:
fieldUnit += "|ISO_8601"
fieldType += "|datetime"
fieldName += "|Updated"
if isExtentReq:
fieldUnit += "|unitless|unitless"
fieldType += "|integer|string"
fieldName += "|TimeSpans|Restriction"
header = (
"#dataset: GeoCSV 2.0\n#delimiter: |\n"
f"{fieldUnit}\n{fieldType}\n{fieldName}\n"
)
first = True
for line in lines:
if first:
first = False
utils.writeTS(req, header)
charCount += len(header)
wid = line[0].waveformID()
e = line[1]
data = nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
if not ro.mergeQuality:
data += f"|{e.quality()}"
if not ro.mergeSampleRate:
data += f"|{e.sampleRate():.1f}"
data += time.format(
self._formatTime(e.start(), True), self._formatTime(e.end(), True)
)
if ro.showLatestUpdate:
data += f"|{self._formatTime(e.updated())}"
if isExtentReq:
data += f"|{e.segmentCount():d}"
data += "|RESTRICTED" if line[2] else "|OPEN"
data += "\n"
utils.writeTS(req, data)
charCount += len(data)
lineCount += 1
return charCount, lineCount
# --------------------------------------------------------------------------
def _writeFormatJSON(self, req, lines, ro):
header = (
f'{{"created":"{self._formatTime(Time.GMT())}",'
'"version": 1.0,'
'"datasources":['
)
footer = "]}"
return self._writeJSONChannels(req, header, footer, lines, ro)
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityExtentRealm:
# --------------------------------------------------------------------------
def __init__(self, access):
self.__access = access
# --------------------------------------------------------------------------
def requestAvatar(self, avatarId, _mind, *interfaces):
if resource.IResource in interfaces:
user = {"mail": utils.u_str(avatarId), "blacklisted": False}
return (
resource.IResource,
FDSNAvailabilityExtent(self.__access, user),
lambda: None,
)
raise NotImplementedError()
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityExtentAuthRealm:
# --------------------------------------------------------------------------
def __init__(self, access, userdb):
self.__access = access
self.__userdb = userdb
# --------------------------------------------------------------------------
def requestAvatar(self, avatarId, _mind, *interfaces):
if resource.IResource in interfaces:
user = self.__userdb.getAttributes(utils.u_str(avatarId))
return (
resource.IResource,
FDSNAvailabilityExtent(self.__access, user),
lambda: None,
)
raise NotImplementedError()
###############################################################################
class FDSNAvailabilityExtent(_Availability):
isLeaf = True
# --------------------------------------------------------------------------
@staticmethod
def _createRequestOptions():
return _AvailabilityExtentRequestOptions()
# --------------------------------------------------------------------------
@staticmethod
def _mergeExtents(attributeExtents):
merged = None
cloned = False
# Create a copy of the extent only if more than 1 element is found. The number
# of elements is not known in advance since attributeExtents might only be an
# iterator.
for attExt in attributeExtents:
if merged is None:
merged = attExt
continue
if not cloned:
merged = datamodel.DataAttributeExtent(merged)
cloned = True
if attExt.start() < merged.start():
merged.setStart(attExt.start())
if attExt.end() > merged.end():
merged.setEnd(attExt.end())
if attExt.updated() > merged.updated():
merged.setUpdated(attExt.updated())
merged.setSegmentCount(merged.segmentCount() + attExt.segmentCount())
return merged
# --------------------------------------------------------------------------
def _writeJSONChannels(self, req, header, footer, lines, ro):
charCount = 0
nslc = (
"{{"
'"network":"{0}",'
'"station":"{1}",'
'"location":"{2}",'
'"channel":"{3}"'
)
quality = ',"quality":"{0}"'
sampleRate = ',"samplerate":{0}'
time = (
',"earliest":"{0}","latest":"{1}","timespanCount":{2}'
',"updated":"{3}","restriction":"{4}"}}'
)
utils.writeTS(req, header)
charCount += len(header)
data = None
for line in lines:
wid = line[0].waveformID()
e = line[1]
data = "," if data is not None else ""
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
if not ro.mergeQuality:
data += quality.format(e.quality())
if not ro.mergeSampleRate:
data += sampleRate.format(e.sampleRate())
data += time.format(
self._formatTime(e.start(), True),
self._formatTime(e.end(), True),
e.segmentCount(),
self._formatTime(e.updated()),
format("RESTRICTED" if line[2] else "OPEN"),
)
utils.writeTS(req, data)
charCount += len(data)
utils.writeTS(req, footer)
charCount += len(footer)
return charCount, len(lines)
# --------------------------------------------------------------------------
def _processRequest(self, req, ro, dac):
if req._disconnected: # pylint: disable=W0212
return False
# tuples: extent, attribute extent, restricted status
lines = []
mergeAll = ro.mergeQuality and ro.mergeSampleRate
mergeNone = not ro.mergeQuality and not ro.mergeSampleRate
# iterate extents
for ext, _, restricted in ro.extentIter(dac, self.user, self.access):
if req._disconnected: # pylint: disable=W0212
return False
# iterate attribute extents and merge them if requested
if mergeNone:
for attExt in ro.attributeExtentIter(ext):
lines.append((ext, attExt, restricted))
continue
if mergeAll:
attExt = self._mergeExtents(ro.attributeExtentIter(ext))
if attExt is not None:
lines.append((ext, attExt, restricted))
continue
attExtDict = {}
if ro.mergeQuality:
# key=sampleRate
for attExt in ro.attributeExtentIter(ext):
if attExt.sampleRate() in attExtDict:
attExtDict[attExt.sampleRate()].append(attExt)
else:
attExtDict[attExt.sampleRate()] = [attExt]
# ro.mergeSampleRate
else:
# key=quality
for attExt in ro.attributeExtentIter(ext):
if attExt.quality() in attExtDict:
attExtDict[attExt.quality()].append(attExt)
else:
attExtDict[attExt.quality()] = [attExt]
for attExtents in attExtDict.values():
lines.append((ext, self._mergeExtents(attExtents), restricted))
# Return 204 if no matching availability information was found
if not lines:
msg = "no matching availabilty information found"
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
return True
# sort lines
self._sortLines(lines, ro)
# truncate lines to requested row limit
if ro.limit:
del lines[ro.limit :]
charCount, extCount = self._writeLines(req, lines, ro)
logging.debug(
f"{ro.service}: returned {extCount} extents (characters: {charCount})"
)
utils.accessLog(req, ro, http.OK, charCount, None)
return True
# --------------------------------------------------------------------------
@staticmethod
def _sortLines(lines, ro):
# Sort by segment count followed by NSLC
if ro.orderBy == ro.VOrderByCount:
lines.sort(
key=lambda x: (
x[1].segmentCount(),
x[0].waveformID().networkCode(),
x[0].waveformID().stationCode(),
x[0].waveformID().locationCode(),
x[0].waveformID().channelCode(),
x[1].start(),
x[1].end(),
x[1].quality(),
x[1].sampleRate(),
)
)
# Sort by segment count in descending order followed by NSLC
elif ro.orderBy == ro.VOrderByCountDesc:
lines.sort(
key=lambda x: (
-x[1].segmentCount(),
x[0].waveformID().networkCode(),
x[0].waveformID().stationCode(),
x[0].waveformID().locationCode(),
x[0].waveformID().channelCode(),
x[1].start(),
x[1].end(),
x[1].quality(),
x[1].sampleRate(),
)
)
# Sort by update time followed by NSLC
elif ro.orderBy == ro.VOrderByUpdate:
lines.sort(
key=lambda x: (
x[1].updated().seconds(),
x[0].waveformID().networkCode(),
x[0].waveformID().stationCode(),
x[0].waveformID().locationCode(),
x[0].waveformID().channelCode(),
x[1].start(),
x[1].end(),
x[1].quality(),
x[1].sampleRate(),
)
)
# Sort by update time in descending order followed by NSLC
elif ro.orderBy == ro.VOrderByUpdateDesc:
lines.sort(
key=lambda x: (
-x[1].updated().seconds(),
x[0].waveformID().networkCode(),
x[0].waveformID().stationCode(),
x[0].waveformID().locationCode(),
x[0].waveformID().channelCode(),
x[1].start(),
x[1].end(),
x[1].quality(),
x[1].sampleRate(),
)
)
# Sort by NSLC
else:
lines.sort(
key=lambda x: (
x[0].waveformID().networkCode(),
x[0].waveformID().stationCode(),
x[0].waveformID().locationCode(),
x[0].waveformID().channelCode(),
x[1].start(),
x[1].end(),
x[1].quality(),
x[1].sampleRate(),
)
)
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityQueryRealm:
# --------------------------------------------------------------------------
def __init__(self, access):
self.__access = access
# --------------------------------------------------------------------------
def requestAvatar(self, avatarId, _mind, *interfaces):
if resource.IResource in interfaces:
user = {"mail": utils.u_str(avatarId), "blacklisted": False}
return (
resource.IResource,
FDSNAvailabilityQuery(self.__access, user),
lambda: None,
)
raise NotImplementedError()
###############################################################################
@implementer(portal.IRealm)
class FDSNAvailabilityQueryAuthRealm:
# --------------------------------------------------------------------------
def __init__(self, access, userdb):
self.__access = access
self.__userdb = userdb
# --------------------------------------------------------------------------
def requestAvatar(self, avatarId, _mind, *interfaces):
if resource.IResource in interfaces:
user = self.__userdb.getAttributes(utils.u_str(avatarId))
return (
resource.IResource,
FDSNAvailabilityQuery(self.__access, user),
lambda: None,
)
raise NotImplementedError()
###############################################################################
class FDSNAvailabilityQuery(_Availability):
isLeaf = True
# --------------------------------------------------------------------------
@staticmethod
def _createRequestOptions():
return _AvailabilityQueryRequestOptions()
# --------------------------------------------------------------------------
def _writeJSONChannels(self, req, header, footer, lines, ro):
charCount = 0
lineCount = 0
nslc = (
"{{"
'"network":"{0}",'
'"station":"{1}",'
'"location":"{2}",'
'"channel":"{3}"'
)
quality = ',"quality":"{0}"'
sampleRate = ',"samplerate":{0}'
updated = ',"updated":"{0}"'
timeSpans = ',"timespans":[['
seg = '"{0}","{1}"]'
class SegGroup:
def __init__(self, segments, updated):
self.segments = segments
self.updated = updated
def writeSegments(segments):
first = True
charCount = 0
for s in segments:
if first:
first = False
data = timeSpans
else:
data = ",["
data += seg.format(
self._formatTime(s.start(), True), self._formatTime(s.end(), True)
)
utils.writeTS(req, data)
charCount += len(data)
return charCount
prevExt = None
# merge of quality and sample rate: all timespans of one stream are
# grouped into one channel
if ro.mergeQuality and ro.mergeSampleRate:
lastUpdate = None
segments = None
for ext, attExt, _restricted in lines:
lineCount += 1
if ext is prevExt:
lastUpdate = max(lastUpdate, attExt.updated())
segments.append(attExt)
continue
if prevExt is not None:
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
wid = prevExt.waveformID()
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(lastUpdate))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(segments)
prevExt = ext
lastUpdate = attExt.updated()
segments = [attExt]
# handle last extent
if prevExt is not None:
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
wid = prevExt.waveformID()
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(lastUpdate))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(segments)
data = "]}"
utils.writeTS(req, data)
charCount += len(data)
utils.writeTS(req, footer)
charCount += len(footer)
# merge of quality: all timespans of one stream are grouped by sample rate
elif ro.mergeQuality:
segGroups = OrderedDict()
for ext, attExt, _restricted in lines:
lineCount += 1
if ext is prevExt:
if attExt.sampleRate() in segGroups:
segGroup = segGroups[attExt.sampleRate()]
segGroup.updated = max(segGroup.updated, attExt.updated())
segGroup.segments.append(attExt)
continue
else:
if prevExt is not None:
wid = prevExt.waveformID()
for sr, sg in segGroups.items():
if req._disconnected: # pylint: disable=W0212
return False
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(sg.segments)
prevExt = ext
segGroups = OrderedDict()
segGroups[attExt.sampleRate()] = SegGroup([attExt], attExt.updated())
# handle last extent
if prevExt is not None:
wid = prevExt.waveformID()
for sr, sg in segGroups.items():
if req._disconnected: # pylint: disable=W0212
return False
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(sg.segments)
data = "]}"
utils.writeTS(req, data)
charCount += len(data)
utils.writeTS(req, footer)
charCount += len(footer)
# merge of sample rate: all timespans of one stream are grouped by
# quality
elif ro.mergeSampleRate:
segGroups = OrderedDict()
for ext, attExt, _restricted in lines:
lineCount += 1
if ext is prevExt:
if attExt.quality() in segGroups:
segGroup = segGroups[attExt.quality()]
segGroup.updated = max(segGroup.updated, attExt.updated())
segGroup.segments.append(attExt)
continue
else:
if prevExt is not None:
wid = prevExt.waveformID()
for q, sg in segGroups.items():
if req._disconnected: # pylint: disable=W0212
return False
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
data += quality.format(q)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(sg.segments)
prevExt = ext
segGroups = OrderedDict()
segGroups[attExt.quality()] = SegGroup([attExt], attExt.updated())
# handle last extent
if prevExt is not None:
wid = prevExt.waveformID()
for q, sg in segGroups.items():
if req._disconnected: # pylint: disable=W0212
return False
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
data += quality.format(q)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(sg.segments)
data = "]}"
utils.writeTS(req, data)
charCount += len(data)
utils.writeTS(req, footer)
charCount += len(footer)
# no merge: all timespans of one stream are grouped by tuple of quality and
# sampleRate
else:
segGroups = OrderedDict()
for ext, attExt, _restricted in lines:
lineCount += 1
t = (attExt.quality(), attExt.sampleRate())
if ext is prevExt:
if t in segGroups:
segGroup = segGroups[t]
segGroup.updated = max(segGroup.updated, attExt.updated())
segGroup.segments.append(attExt)
continue
else:
if prevExt is not None:
wid = prevExt.waveformID()
for (q, sr), sg in segGroups.items():
if req._disconnected: # pylint: disable=W0212
return False
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
data += quality.format(q)
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(sg.segments)
prevExt = ext
segGroups = OrderedDict()
segGroups[t] = SegGroup([attExt], attExt.updated())
# handle last extent
if prevExt is not None:
wid = prevExt.waveformID()
for (q, sr), sg in segGroups.items():
if req._disconnected: # pylint: disable=W0212
return False
if charCount == 0:
utils.writeTS(req, header)
charCount += len(header)
data = ""
else:
data = "]},"
data += nslc.format(
wid.networkCode(),
wid.stationCode(),
wid.locationCode(),
wid.channelCode(),
)
data += quality.format(q)
data += sampleRate.format(sr)
if ro.showLatestUpdate:
data += updated.format(self._formatTime(sg.updated))
utils.writeTS(req, data)
charCount += len(data)
charCount += writeSegments(sg.segments)
data = "]}"
utils.writeTS(req, data)
charCount += len(data)
utils.writeTS(req, footer)
charCount += len(footer)
return charCount, lineCount
# --------------------------------------------------------------------------
def _processRequest(self, req, ro, dac):
if req._disconnected: # pylint: disable=W0212
return False
# tuples: wid, segment, restricted status
lines = []
charCount = 0
# iterate extents and create IN clauses of parent_oids in bunches
# of 1000 because the query size is limited
parentOIDs, idList, tooLarge = [], [], []
i = 0
for ext, objID, _ in ro.extentIter(dac, self.user, self.access):
if req._disconnected: # pylint: disable=W0212
return False
if ro.excludeTooLarge:
if ext.segmentOverflow():
continue
elif ext.segmentOverflow():
tooLarge.append(ext)
continue
elif tooLarge:
continue
if i < 1000:
idList.append(objID)
i += 1
else:
parentOIDs.append(idList)
idList = [objID]
i = 1
if not ro.excludeTooLarge and tooLarge:
extents = ", ".join(
f"{e.waveformID().networkCode()}.{e.waveformID().stationCode()}."
f"{e.waveformID().locationCode()}.{e.waveformID().channelCode()}"
for e in tooLarge
)
msg = (
"Unable to process request due to database limitations. Some "
"selections have too many segments to process. Rejected extents: "
f"{{{extents}}}. This limitation may be resolved in a future version "
"of this webservice."
)
self.writeErrorPage(req, http.REQUEST_ENTITY_TOO_LARGE, msg, ro)
return False
if len(idList) > 0:
parentOIDs.append(idList)
else:
msg = "no matching availabilty information found"
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
return False
db = io.DatabaseInterface.Open(Application.Instance().databaseURI())
if db is None:
msg = "could not connect to database"
return self.renderErrorPage(req, http.SERVICE_UNAVAILABLE, msg, ro)
lines = self._lineIter(db, parentOIDs, req, ro, dac.extentsOID())
charCount, segCount = self._writeLines(req, lines, ro)
# Return 204 if no matching availability information was found
if segCount <= 0:
msg = "no matching availabilty information found"
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
return True
logging.debug(
f"{ro.service}: returned {segCount} segments (characters: {charCount})"
)
utils.accessLog(req, ro, http.OK, charCount, None)
return True
# --------------------------------------------------------------------------
@staticmethod
def _lineIter(db, parentOIDs, req, ro, oIDs):
def _T(name):
return db.convertColumnName(name)
dba = datamodel.DatabaseArchive(db)
for idList in parentOIDs:
if req._disconnected: # pylint: disable=W0212
return
# build SQL query
idStr = ",".join(str(x) for x in idList)
q = f"SELECT * from DataSegment WHERE _parent_oid IN ({idStr}) "
if ro.time:
if ro.time.start is not None:
if ro.time.start.microseconds() == 0:
q += f"AND {_T('end')} >= '{db.timeToString(ro.time.start)}' "
else:
q += (
"AND ({0} > '{1}' OR ("
f"{_T('end')} = '{db.timeToString(ro.time.start)}' AND "
f"end_ms >= {ro.time.start.microseconds()})) "
)
if ro.time.end is not None:
if ro.time.end.microseconds() == 0:
q += f"AND {_T('start')} < '{db.timeToString(ro.time.end)}' "
else:
q += (
"AND ({0} < '{1}' OR ("
f"{_T('start')} = '{db.timeToString(ro.time.end)}' AND "
"start_ms < {ro.time.end.microseconds()})) "
)
if ro.quality:
qualities = "', '".join(ro.quality)
q += f"AND {_T('quality')} IN ('{qualities}') "
q += f"ORDER BY _parent_oid, {_T('start')}, {_T('start_ms')}"
segIt = dba.getObjectIterator(q, datamodel.DataSegment.TypeInfo())
if segIt is None or not segIt.valid():
return
# Iterate and optionally merge segments.
# A segment will be yielded if
# - the extent changed
# - quality changed and merging of quality was not requested
# - sample rate changed and merging of sample rate was not
# requested
# - an overlap was detected and merging of overlaps was not
# requested
# - an gap was detected and the gaps exceeds the mergeGaps
# threshold
seg = None
ext = None
lines = 0
jitter = 0.0
while (
not req._disconnected # pylint: disable=W0212
and (seg is None or segIt.next())
and (not ro.limit or lines < ro.limit)
):
s = datamodel.DataSegment.Cast(segIt.get())
if s is None:
break
# get extent for current segment
try:
e, restricted = oIDs[segIt.parentOid()]
except KeyError:
logging.warning(f"parent object id not found: {segIt.parentOid()}")
continue
# first segment, no merge test required
if seg is None:
seg = s
ext = e
jitter = 1 / (2 * s.sampleRate())
continue
# merge test
diff = float(s.start() - seg.end())
mQuality = ro.mergeQuality or s.quality() == seg.quality()
mSampleRate = ro.mergeSampleRate or s.sampleRate() == seg.sampleRate()
mGaps = (ro.mergeGaps is None and diff <= jitter) or (
ro.mergeGaps is not None and diff <= ro.mergeGaps
)
mOverlap = -diff <= jitter or ro.mergeOverlap
if e is ext and mQuality and mSampleRate and mGaps and mOverlap:
seg.setEnd(s.end())
if s.updated() > seg.updated():
seg.setUpdated(s.updated())
continue
# merge was not possible, yield previous segment
yield (ext, seg, restricted)
lines += 1
seg = s
ext = e
if seg.sampleRate() != s.sampleRate():
jitter = 1 / (2 * s.sampleRate())
if seg is not None and (not ro.limit or lines < ro.limit):
yield (ext, seg, restricted)
# close database iterator if iteration was stopped because of
# row limit
return
# vim: ts=4 et tw=79