1443 lines
51 KiB
Python
1443 lines
51 KiB
Python
###############################################################################
|
|
# Copyright (C) 2013-2014 by gempa GmbH
|
|
#
|
|
# FDSNStation -- Implements the fdsnws-availability Web service, see
|
|
# http://www.fdsn.org/webservices/
|
|
#
|
|
# Feature notes:
|
|
# - additional request parameters:
|
|
# - excludetoolarge: boolean, default: true
|
|
#
|
|
# Author: Stephan Herrnkind
|
|
# Email: herrnkind@gempa.de
|
|
###############################################################################
|
|
|
|
from functools import cmp_to_key
|
|
|
|
from collections import OrderedDict
|
|
|
|
from twisted.cred import portal
|
|
from twisted.internet.threads import deferToThread
|
|
from twisted.web import http, resource, server
|
|
|
|
from zope.interface import implementer
|
|
|
|
from seiscomp import datamodel, io, logging
|
|
from seiscomp.client import Application
|
|
from seiscomp.core import Time
|
|
|
|
from .http import BaseResource
|
|
from .request import RequestOptions
|
|
|
|
from . import utils
|
|
|
|
|
|
DBMaxUInt = 18446744073709551615 # 2^64 - 1
|
|
VERSION = "1.0.3"
|
|
|
|
|
|
###############################################################################
|
|
class _AvailabilityRequestOptions(RequestOptions):
|
|
# merge options
|
|
VMergeSampleRate = "samplerate"
|
|
VMergeQuality = "quality"
|
|
VMergeOverlap = "overlap"
|
|
VMerge = [VMergeSampleRate, VMergeQuality] # overlap option only available
|
|
# in query method
|
|
|
|
# orderby options
|
|
VOrderByNSLC = "nslc_time_quality_samplerate"
|
|
VOrderByCount = "timespancount"
|
|
VOrderByCountDesc = "timespancount_desc"
|
|
VOrderByUpdate = "latestupdate"
|
|
VOrderByUpdateDesc = "latestupdate_desc"
|
|
VOrderBy = [
|
|
VOrderByNSLC,
|
|
VOrderByCount,
|
|
VOrderByCountDesc,
|
|
VOrderByUpdate,
|
|
VOrderByUpdateDesc,
|
|
]
|
|
|
|
# format options
|
|
VFormatText = "text"
|
|
VFormatGeoCSV = "geocsv"
|
|
VFormatJSON = "json"
|
|
VFormatRequest = "request"
|
|
OutputFormats = [VFormatText, VFormatGeoCSV, VFormatJSON, VFormatRequest]
|
|
|
|
# request parameters
|
|
PQuality = ["quality"]
|
|
PMerge = ["merge"]
|
|
POrderBy = ["orderby"]
|
|
PLimit = ["limit"]
|
|
PIncludeRestricted = ["includerestricted"]
|
|
|
|
POSTParams = (
|
|
RequestOptions.POSTParams
|
|
+ PQuality
|
|
+ PMerge
|
|
+ POrderBy
|
|
+ PLimit
|
|
+ PIncludeRestricted
|
|
)
|
|
GETParams = RequestOptions.GETParams + POSTParams
|
|
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self.service = "availability-base"
|
|
self.quality = None
|
|
self.mergeSampleRate = None
|
|
self.mergeQuality = None
|
|
self.mergeOverlap = None
|
|
self.orderBy = self.VOrderBy[0]
|
|
self.limit = None
|
|
self.includeRestricted = None
|
|
self.showLatestUpdate = None
|
|
|
|
# --------------------------------------------------------------------------
|
|
def parse(self):
|
|
self.parseTime()
|
|
self.parseChannel()
|
|
self.parseOutput()
|
|
|
|
# quality: D, M, Q, R, * (optional)
|
|
for v in self.getListValues(self.PQuality):
|
|
if len(v) == 1:
|
|
if v[0] == "*":
|
|
self.quality = None
|
|
break
|
|
if v[0].isupper():
|
|
if self.quality is None:
|
|
self.quality = [v]
|
|
else:
|
|
self.quality.append(v)
|
|
continue
|
|
self.raiseValueError(self.PQuality[0])
|
|
|
|
# merge (optional)
|
|
for v in self.getListValues(self.PMerge, True):
|
|
if v not in self.VMerge:
|
|
self.raiseValueError(self.PMerge[0])
|
|
|
|
if v == self.VMergeSampleRate:
|
|
self.mergeSampleRate = True
|
|
elif v == self.VMergeQuality:
|
|
self.mergeQuality = True
|
|
elif v == self.VMergeOverlap:
|
|
self.mergeOverlap = True
|
|
|
|
# orderby (optional)
|
|
key, value = self.getFirstValue(self.POrderBy)
|
|
if value is not None:
|
|
if value in self.VOrderBy:
|
|
self.orderBy = value
|
|
else:
|
|
self.raiseValueError(key)
|
|
|
|
# limit (optional)
|
|
self.limit = self.parseInt(self.PLimit, 1, DBMaxUInt)
|
|
|
|
# includeRestricted (optional)
|
|
self.includeRestricted = self.parseBool(self.PIncludeRestricted)
|
|
|
|
# --------------------------------------------------------------------------
|
|
def extentIter(self, dac, user=None, access=None):
|
|
# tupel: extent, oid, restricted
|
|
for e in dac.extentsSorted():
|
|
ext = e[0]
|
|
restricted = e[2]
|
|
wid = ext.waveformID()
|
|
net = wid.networkCode()
|
|
sta = wid.stationCode()
|
|
loc = wid.locationCode()
|
|
cha = wid.channelCode()
|
|
|
|
if restricted:
|
|
if not user:
|
|
continue
|
|
|
|
if access:
|
|
startTime = ext.start()
|
|
endTime = ext.end()
|
|
if self.time:
|
|
if self.time.start() and self.time.start() > startTime:
|
|
startTime = self.time.start()
|
|
if self.time.end() and self.time.end() < ext.end():
|
|
endTime = self.time.end()
|
|
|
|
if not access.authorize(
|
|
user, net, sta, loc, cha, startTime, endTime
|
|
):
|
|
continue
|
|
|
|
for ro in self.streams:
|
|
if ro.channel:
|
|
if (
|
|
not ro.channel.matchNet(net)
|
|
or not ro.channel.matchSta(sta)
|
|
or not ro.channel.matchLoc(loc)
|
|
or not ro.channel.matchCha(cha)
|
|
):
|
|
continue
|
|
|
|
if ro.time and not ro.time.match(ext.start(), ext.end()):
|
|
continue
|
|
|
|
if not self.includeRestricted and restricted:
|
|
continue
|
|
|
|
yield e
|
|
|
|
|
|
###############################################################################
|
|
class _AvailabilityExtentRequestOptions(_AvailabilityRequestOptions):
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.service = "availability-extent"
|
|
|
|
self.showLatestUpdate = True
|
|
|
|
# --------------------------------------------------------------------------
|
|
def attributeExtentIter(self, ext):
|
|
for i in range(ext.dataAttributeExtentCount()):
|
|
attExt = ext.dataAttributeExtent(i)
|
|
|
|
if self.time and not self.time.match(attExt.start(), attExt.end()):
|
|
continue
|
|
|
|
if self.quality and attExt.quality() not in self.quality:
|
|
continue
|
|
|
|
yield attExt
|
|
|
|
|
|
###############################################################################
|
|
class _AvailabilityQueryRequestOptions(_AvailabilityRequestOptions):
|
|
# additional merge options
|
|
VMerge = _AvailabilityRequestOptions.VMerge + [
|
|
_AvailabilityRequestOptions.VMergeOverlap
|
|
]
|
|
|
|
# show options
|
|
VShowLatestUpdate = "latestupdate"
|
|
VShow = [VShowLatestUpdate]
|
|
|
|
# additional query specific request parameters
|
|
PMergeGaps = ["mergegaps"]
|
|
PShow = ["show"]
|
|
PExcludeTooLarge = ["excludetoolarge"]
|
|
|
|
POSTParams = (
|
|
_AvailabilityRequestOptions.POSTParams + PMergeGaps + PShow + PExcludeTooLarge
|
|
)
|
|
GETParams = (
|
|
_AvailabilityRequestOptions.GETParams + PMergeGaps + PShow + PExcludeTooLarge
|
|
)
|
|
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.service = "availability-query"
|
|
|
|
self.orderBy = None
|
|
self.mergeGaps = None
|
|
self.excludeTooLarge = None
|
|
|
|
# --------------------------------------------------------------------------
|
|
def parse(self):
|
|
_AvailabilityRequestOptions.parse(self)
|
|
|
|
# merge gaps threshold (optional)
|
|
self.mergeGaps = self.parseFloat(self.PMergeGaps, 0)
|
|
|
|
# show (optional)
|
|
for v in self.getListValues(self.PShow, True):
|
|
if v not in self.VShow:
|
|
self.raiseValueError(v)
|
|
if v == self.VShowLatestUpdate:
|
|
self.showLatestUpdate = True
|
|
|
|
# exclude to large (optional)
|
|
self.excludeTooLarge = self.parseBool(self.PExcludeTooLarge)
|
|
if self.excludeTooLarge is None:
|
|
self.excludeTooLarge = True
|
|
|
|
if not self.channel:
|
|
raise ValueError("Request contains no selections")
|
|
|
|
if self.orderBy:
|
|
raise ValueError("orderby not supported for query request")
|
|
|
|
|
|
###############################################################################
|
|
class _Availability(BaseResource):
|
|
isLeaf = True
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _createRequestOptions():
|
|
pass
|
|
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self, access=None, user=None):
|
|
super().__init__(VERSION)
|
|
|
|
self.access = access
|
|
self.user = user
|
|
|
|
# --------------------------------------------------------------------------
|
|
def render_OPTIONS(self, req):
|
|
req.setHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
|
|
req.setHeader(
|
|
"Access-Control-Allow-Headers",
|
|
"Accept, Content-Type, X-Requested-With, Origin",
|
|
)
|
|
req.setHeader("Content-Type", "text/plain; charset=utf-8")
|
|
return ""
|
|
|
|
# --------------------------------------------------------------------------
|
|
def render_GET(self, req):
|
|
# Parse and validate GET parameters
|
|
ro = self._createRequestOptions()
|
|
try:
|
|
ro.parseGET(req.args)
|
|
ro.parse()
|
|
|
|
# the GET operation supports exactly one stream filter
|
|
ro.streams.append(ro)
|
|
except ValueError as e:
|
|
logging.warning(str(e))
|
|
return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
|
|
|
|
return self._prepareRequest(req, ro)
|
|
|
|
# --------------------------------------------------------------------------
|
|
def render_POST(self, req):
|
|
# Parse and validate POST parameters
|
|
ro = self._createRequestOptions()
|
|
try:
|
|
ro.parsePOST(req.content)
|
|
ro.parse()
|
|
except ValueError as e:
|
|
logging.warning(str(e))
|
|
return self.renderErrorPage(req, http.BAD_REQUEST, str(e), ro)
|
|
|
|
return self._prepareRequest(req, ro)
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _processRequest(self, req, ro, dac):
|
|
pass
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeJSONChannels(self, req, header, footer, lines, ro):
|
|
pass
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _prepareRequest(self, req, ro):
|
|
dac = Application.Instance().getDACache()
|
|
|
|
if ro.format == ro.VFormatJSON:
|
|
contentType = "application/json; charset=utf-8"
|
|
extension = "json"
|
|
elif ro.format == ro.VFormatGeoCSV:
|
|
contentType = "text/csv; charset=utf-8"
|
|
extension = "csv"
|
|
else:
|
|
contentType = "text/plain; charset=utf-8"
|
|
extension = "txt"
|
|
|
|
req.setHeader("Content-Type", contentType)
|
|
req.setHeader(
|
|
"Content-Disposition",
|
|
f"inline; filename=fdsnws-ext-availability_{Time.GMT().iso()}.{extension}",
|
|
)
|
|
|
|
d = deferToThread(self._processRequest, req, ro, dac)
|
|
|
|
req.notifyFinish().addErrback(utils.onCancel, d)
|
|
d.addBoth(utils.onFinish, req)
|
|
|
|
# The request is handled by the deferred object
|
|
return server.NOT_DONE_YET
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _formatTime(time, ms=False):
|
|
if ms:
|
|
return f"{time.toString('%FT%T')}.{time.microseconds():06d}Z"
|
|
return time.toString("%FT%TZ")
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeLines(self, req, lines, ro):
|
|
if ro.format == ro.VFormatText:
|
|
return self._writeFormatText(req, lines, ro)
|
|
if ro.format == ro.VFormatGeoCSV:
|
|
return self._writeFormatGeoCSV(req, lines, ro)
|
|
if ro.format == ro.VFormatJSON:
|
|
return self._writeFormatJSON(req, lines, ro)
|
|
if ro.format == ro.VFormatRequest:
|
|
return self._writeFormatRequest(req, lines, ro)
|
|
|
|
raise ValueError(f"unknown reponse format: {ro.format}")
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeFormatText(self, req, lines, ro):
|
|
charCount = 0
|
|
lineCount = 0
|
|
|
|
# the extent service uses a different update column name and alignment
|
|
isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
|
|
|
|
nslc = "{0: <2} {1: <5} {2: <2} {3: <3}"
|
|
quality = " {0: <1}"
|
|
sampleRate = " {0: >11}"
|
|
sampleRateF = " {0: >11.1f}"
|
|
time = " {0: <27} {1: <27}"
|
|
updated = " {0: <20}"
|
|
timeSpans = " {0: >10}"
|
|
restriction = " {0: <11}"
|
|
|
|
header = nslc.format("#N", "S", "L", "C")
|
|
if not ro.mergeQuality:
|
|
header += quality.format("Q")
|
|
if not ro.mergeSampleRate:
|
|
header += sampleRate.format("SampleRate")
|
|
header += time.format("Earliest", "Latest")
|
|
if ro.showLatestUpdate:
|
|
header += updated.format("Updated")
|
|
if isExtentReq:
|
|
header += timeSpans.format("TimeSpans")
|
|
header += restriction.format("Restriction")
|
|
header += "\n"
|
|
|
|
first = True
|
|
for line in lines:
|
|
if first:
|
|
first = False
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
|
|
wid = line[0].waveformID()
|
|
e = line[1]
|
|
loc = wid.locationCode() if wid.locationCode() else "--"
|
|
data = nslc.format(
|
|
wid.networkCode(), wid.stationCode(), loc, wid.channelCode()
|
|
)
|
|
if not ro.mergeQuality:
|
|
data += quality.format(e.quality())
|
|
if not ro.mergeSampleRate:
|
|
data += sampleRateF.format(e.sampleRate())
|
|
data += time.format(
|
|
self._formatTime(e.start(), True), self._formatTime(e.end(), True)
|
|
)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(e.updated()))
|
|
if isExtentReq:
|
|
data += timeSpans.format(e.segmentCount())
|
|
data += restriction.format("RESTRICTED" if line[2] else "OPEN")
|
|
data += "\n"
|
|
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
lineCount += 1
|
|
|
|
return charCount, lineCount
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeFormatRequest(self, req, lines, ro):
|
|
charCount = 0
|
|
lineCount = 0
|
|
|
|
for line in lines:
|
|
wid = line[0].waveformID()
|
|
e = line[1]
|
|
loc = wid.locationCode() if wid.locationCode() else "--"
|
|
start = e.start()
|
|
end = e.end()
|
|
|
|
# truncate start and end time to requested time frame
|
|
if ro.time:
|
|
if ro.time.start and ro.time.start > start:
|
|
start = ro.time.start
|
|
if ro.time.end and ro.time.end < end:
|
|
end = ro.time.end
|
|
|
|
data = (
|
|
f"{wid.networkCode()} {wid.stationCode()} {loc} {wid.channelCode()} "
|
|
f"{self._formatTime(start, True)} {self._formatTime(end, True)}\n"
|
|
)
|
|
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
lineCount += 1
|
|
|
|
return charCount, lineCount
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeFormatGeoCSV(self, req, lines, ro):
|
|
charCount = 0
|
|
lineCount = 0
|
|
|
|
# the extent service uses a different update column name and alignment
|
|
isExtentReq = ro.__class__ == _AvailabilityExtentRequestOptions
|
|
|
|
nslc = "{0}|{1}|{2}|{3}"
|
|
time = "|{0}|{1}"
|
|
|
|
# header
|
|
fieldUnit = "#field_unit: unitless|unitless|unitless|unitless"
|
|
fieldType = "#field_type: string|string|string|string"
|
|
fieldName = "Network|Station|Location|Channel"
|
|
|
|
if not ro.mergeQuality:
|
|
fieldUnit += "|unitless"
|
|
fieldType += "|string"
|
|
fieldName += "|Quality"
|
|
|
|
if not ro.mergeSampleRate:
|
|
fieldUnit += " |hertz"
|
|
fieldType += " |float"
|
|
fieldName += "|SampleRate"
|
|
|
|
fieldUnit += " |ISO_8601|ISO_8601"
|
|
fieldType += " |datetime|datetime"
|
|
fieldName += "|Earliest|Latest"
|
|
|
|
if ro.showLatestUpdate:
|
|
fieldUnit += "|ISO_8601"
|
|
fieldType += "|datetime"
|
|
fieldName += "|Updated"
|
|
|
|
if isExtentReq:
|
|
fieldUnit += "|unitless|unitless"
|
|
fieldType += "|integer|string"
|
|
fieldName += "|TimeSpans|Restriction"
|
|
|
|
header = (
|
|
"#dataset: GeoCSV 2.0\n#delimiter: |\n"
|
|
f"{fieldUnit}\n{fieldType}\n{fieldName}\n"
|
|
)
|
|
|
|
first = True
|
|
for line in lines:
|
|
if first:
|
|
first = False
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
|
|
wid = line[0].waveformID()
|
|
e = line[1]
|
|
data = nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
if not ro.mergeQuality:
|
|
data += f"|{e.quality()}"
|
|
if not ro.mergeSampleRate:
|
|
data += f"|{e.sampleRate():.1f}"
|
|
data += time.format(
|
|
self._formatTime(e.start(), True), self._formatTime(e.end(), True)
|
|
)
|
|
if ro.showLatestUpdate:
|
|
data += f"|{self._formatTime(e.updated())}"
|
|
if isExtentReq:
|
|
data += f"|{e.segmentCount():d}"
|
|
data += "|RESTRICTED" if line[2] else "|OPEN"
|
|
data += "\n"
|
|
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
lineCount += 1
|
|
|
|
return charCount, lineCount
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeFormatJSON(self, req, lines, ro):
|
|
header = (
|
|
f'{{"created":"{self._formatTime(Time.GMT())}",'
|
|
'"version": 1.0,'
|
|
'"datasources":['
|
|
)
|
|
footer = "]}"
|
|
|
|
return self._writeJSONChannels(req, header, footer, lines, ro)
|
|
|
|
|
|
###############################################################################
|
|
@implementer(portal.IRealm)
|
|
class FDSNAvailabilityExtentRealm:
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self, access):
|
|
self.__access = access
|
|
|
|
# --------------------------------------------------------------------------
|
|
def requestAvatar(self, avatarId, _mind, *interfaces):
|
|
if resource.IResource in interfaces:
|
|
user = {"mail": utils.u_str(avatarId), "blacklisted": False}
|
|
return (
|
|
resource.IResource,
|
|
FDSNAvailabilityExtent(self.__access, user),
|
|
lambda: None,
|
|
)
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
###############################################################################
|
|
@implementer(portal.IRealm)
|
|
class FDSNAvailabilityExtentAuthRealm:
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self, access, userdb):
|
|
self.__access = access
|
|
self.__userdb = userdb
|
|
|
|
# --------------------------------------------------------------------------
|
|
def requestAvatar(self, avatarId, _mind, *interfaces):
|
|
if resource.IResource in interfaces:
|
|
user = self.__userdb.getAttributes(utils.u_str(avatarId))
|
|
return (
|
|
resource.IResource,
|
|
FDSNAvailabilityExtent(self.__access, user),
|
|
lambda: None,
|
|
)
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
###############################################################################
|
|
class FDSNAvailabilityExtent(_Availability):
|
|
isLeaf = True
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _createRequestOptions():
|
|
return _AvailabilityExtentRequestOptions()
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _mergeExtents(attributeExtents):
|
|
merged = None
|
|
cloned = False
|
|
|
|
# Create a copy of the extent only if more than 1 element is found. The number
|
|
# of elements is not known in advance since attributeExtents might only be an
|
|
# iterator.
|
|
for attExt in attributeExtents:
|
|
if merged is None:
|
|
merged = attExt
|
|
continue
|
|
|
|
if not cloned:
|
|
merged = datamodel.DataAttributeExtent(merged)
|
|
cloned = True
|
|
|
|
if attExt.start() < merged.start():
|
|
merged.setStart(attExt.start())
|
|
if attExt.end() > merged.end():
|
|
merged.setEnd(attExt.end())
|
|
if attExt.updated() > merged.updated():
|
|
merged.setUpdated(attExt.updated())
|
|
merged.setSegmentCount(merged.segmentCount() + attExt.segmentCount())
|
|
|
|
return merged
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeJSONChannels(self, req, header, footer, lines, ro):
|
|
charCount = 0
|
|
|
|
nslc = (
|
|
"{{"
|
|
'"network":"{0}",'
|
|
'"station":"{1}",'
|
|
'"location":"{2}",'
|
|
'"channel":"{3}"'
|
|
)
|
|
quality = ',"quality":"{0}"'
|
|
sampleRate = ',"samplerate":{0}'
|
|
time = (
|
|
',"earliest":"{0}","latest":"{1}","timespanCount":{2}'
|
|
',"updated":"{3}","restriction":"{4}"}}'
|
|
)
|
|
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
|
|
data = None
|
|
for line in lines:
|
|
wid = line[0].waveformID()
|
|
e = line[1]
|
|
data = "," if data is not None else ""
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
if not ro.mergeQuality:
|
|
data += quality.format(e.quality())
|
|
if not ro.mergeSampleRate:
|
|
data += sampleRate.format(e.sampleRate())
|
|
data += time.format(
|
|
self._formatTime(e.start(), True),
|
|
self._formatTime(e.end(), True),
|
|
e.segmentCount(),
|
|
self._formatTime(e.updated()),
|
|
format("RESTRICTED" if line[2] else "OPEN"),
|
|
)
|
|
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
|
|
utils.writeTS(req, footer)
|
|
charCount += len(footer)
|
|
|
|
return charCount, len(lines)
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _processRequest(self, req, ro, dac):
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
# tuples: extent, attribute extent, restricted status
|
|
lines = []
|
|
|
|
mergeAll = ro.mergeQuality and ro.mergeSampleRate
|
|
mergeNone = not ro.mergeQuality and not ro.mergeSampleRate
|
|
|
|
# iterate extents
|
|
for ext, _, restricted in ro.extentIter(dac, self.user, self.access):
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
# iterate attribute extents and merge them if requested
|
|
if mergeNone:
|
|
for attExt in ro.attributeExtentIter(ext):
|
|
lines.append((ext, attExt, restricted))
|
|
continue
|
|
|
|
if mergeAll:
|
|
attExt = self._mergeExtents(ro.attributeExtentIter(ext))
|
|
if attExt is not None:
|
|
lines.append((ext, attExt, restricted))
|
|
continue
|
|
|
|
attExtDict = {}
|
|
if ro.mergeQuality:
|
|
# key=sampleRate
|
|
for attExt in ro.attributeExtentIter(ext):
|
|
if attExt.sampleRate() in attExtDict:
|
|
attExtDict[attExt.sampleRate()].append(attExt)
|
|
else:
|
|
attExtDict[attExt.sampleRate()] = [attExt]
|
|
# ro.mergeSampleRate
|
|
else:
|
|
# key=quality
|
|
for attExt in ro.attributeExtentIter(ext):
|
|
if attExt.quality() in attExtDict:
|
|
attExtDict[attExt.quality()].append(attExt)
|
|
else:
|
|
attExtDict[attExt.quality()] = [attExt]
|
|
|
|
for attExtents in attExtDict.values():
|
|
lines.append((ext, self._mergeExtents(attExtents), restricted))
|
|
|
|
# Return 204 if no matching availability information was found
|
|
if not lines:
|
|
msg = "no matching availabilty information found"
|
|
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
|
|
return True
|
|
|
|
# sort lines
|
|
self._sortLines(lines, ro)
|
|
|
|
# truncate lines to requested row limit
|
|
if ro.limit:
|
|
del lines[ro.limit :]
|
|
|
|
charCount, extCount = self._writeLines(req, lines, ro)
|
|
|
|
logging.debug(
|
|
f"{ro.service}: returned {extCount} extents (characters: {charCount})"
|
|
)
|
|
utils.accessLog(req, ro, http.OK, charCount, None)
|
|
return True
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _sortLines(lines, ro):
|
|
|
|
# Sort by segment count followed by NSLC
|
|
if ro.orderBy == ro.VOrderByCount:
|
|
lines.sort(
|
|
key=lambda x: (
|
|
x[1].segmentCount(),
|
|
x[0].waveformID().networkCode(),
|
|
x[0].waveformID().stationCode(),
|
|
x[0].waveformID().locationCode(),
|
|
x[0].waveformID().channelCode(),
|
|
x[1].start(),
|
|
x[1].end(),
|
|
x[1].quality(),
|
|
x[1].sampleRate(),
|
|
)
|
|
)
|
|
# Sort by segment count in descending order followed by NSLC
|
|
elif ro.orderBy == ro.VOrderByCountDesc:
|
|
lines.sort(
|
|
key=lambda x: (
|
|
-x[1].segmentCount(),
|
|
x[0].waveformID().networkCode(),
|
|
x[0].waveformID().stationCode(),
|
|
x[0].waveformID().locationCode(),
|
|
x[0].waveformID().channelCode(),
|
|
x[1].start(),
|
|
x[1].end(),
|
|
x[1].quality(),
|
|
x[1].sampleRate(),
|
|
)
|
|
)
|
|
# Sort by update time followed by NSLC
|
|
elif ro.orderBy == ro.VOrderByUpdate:
|
|
lines.sort(
|
|
key=lambda x: (
|
|
x[1].updated().seconds(),
|
|
x[0].waveformID().networkCode(),
|
|
x[0].waveformID().stationCode(),
|
|
x[0].waveformID().locationCode(),
|
|
x[0].waveformID().channelCode(),
|
|
x[1].start(),
|
|
x[1].end(),
|
|
x[1].quality(),
|
|
x[1].sampleRate(),
|
|
)
|
|
)
|
|
# Sort by update time in descending order followed by NSLC
|
|
elif ro.orderBy == ro.VOrderByUpdateDesc:
|
|
lines.sort(
|
|
key=lambda x: (
|
|
-x[1].updated().seconds(),
|
|
x[0].waveformID().networkCode(),
|
|
x[0].waveformID().stationCode(),
|
|
x[0].waveformID().locationCode(),
|
|
x[0].waveformID().channelCode(),
|
|
x[1].start(),
|
|
x[1].end(),
|
|
x[1].quality(),
|
|
x[1].sampleRate(),
|
|
)
|
|
)
|
|
# Sort by NSLC
|
|
else:
|
|
lines.sort(
|
|
key=lambda x: (
|
|
x[0].waveformID().networkCode(),
|
|
x[0].waveformID().stationCode(),
|
|
x[0].waveformID().locationCode(),
|
|
x[0].waveformID().channelCode(),
|
|
x[1].start(),
|
|
x[1].end(),
|
|
x[1].quality(),
|
|
x[1].sampleRate(),
|
|
)
|
|
)
|
|
|
|
|
|
###############################################################################
|
|
@implementer(portal.IRealm)
|
|
class FDSNAvailabilityQueryRealm:
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self, access):
|
|
self.__access = access
|
|
|
|
# --------------------------------------------------------------------------
|
|
def requestAvatar(self, avatarId, _mind, *interfaces):
|
|
if resource.IResource in interfaces:
|
|
user = {"mail": utils.u_str(avatarId), "blacklisted": False}
|
|
return (
|
|
resource.IResource,
|
|
FDSNAvailabilityQuery(self.__access, user),
|
|
lambda: None,
|
|
)
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
###############################################################################
|
|
@implementer(portal.IRealm)
|
|
class FDSNAvailabilityQueryAuthRealm:
|
|
# --------------------------------------------------------------------------
|
|
def __init__(self, access, userdb):
|
|
self.__access = access
|
|
self.__userdb = userdb
|
|
|
|
# --------------------------------------------------------------------------
|
|
def requestAvatar(self, avatarId, _mind, *interfaces):
|
|
if resource.IResource in interfaces:
|
|
user = self.__userdb.getAttributes(utils.u_str(avatarId))
|
|
return (
|
|
resource.IResource,
|
|
FDSNAvailabilityQuery(self.__access, user),
|
|
lambda: None,
|
|
)
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
###############################################################################
|
|
class FDSNAvailabilityQuery(_Availability):
|
|
isLeaf = True
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _createRequestOptions():
|
|
return _AvailabilityQueryRequestOptions()
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _writeJSONChannels(self, req, header, footer, lines, ro):
|
|
charCount = 0
|
|
lineCount = 0
|
|
|
|
nslc = (
|
|
"{{"
|
|
'"network":"{0}",'
|
|
'"station":"{1}",'
|
|
'"location":"{2}",'
|
|
'"channel":"{3}"'
|
|
)
|
|
quality = ',"quality":"{0}"'
|
|
sampleRate = ',"samplerate":{0}'
|
|
updated = ',"updated":"{0}"'
|
|
timeSpans = ',"timespans":[['
|
|
seg = '"{0}","{1}"]'
|
|
|
|
class SegGroup:
|
|
def __init__(self, segments, updated):
|
|
self.segments = segments
|
|
self.updated = updated
|
|
|
|
def writeSegments(segments):
|
|
first = True
|
|
charCount = 0
|
|
|
|
for s in segments:
|
|
if first:
|
|
first = False
|
|
data = timeSpans
|
|
else:
|
|
data = ",["
|
|
|
|
data += seg.format(
|
|
self._formatTime(s.start(), True), self._formatTime(s.end(), True)
|
|
)
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
|
|
return charCount
|
|
|
|
prevExt = None
|
|
|
|
# merge of quality and sample rate: all timespans of one stream are
|
|
# grouped into one channel
|
|
if ro.mergeQuality and ro.mergeSampleRate:
|
|
lastUpdate = None
|
|
segments = None
|
|
for ext, attExt, _restricted in lines:
|
|
lineCount += 1
|
|
|
|
if ext is prevExt:
|
|
lastUpdate = max(lastUpdate, attExt.updated())
|
|
segments.append(attExt)
|
|
continue
|
|
|
|
if prevExt is not None:
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
wid = prevExt.waveformID()
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(lastUpdate))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(segments)
|
|
|
|
prevExt = ext
|
|
lastUpdate = attExt.updated()
|
|
segments = [attExt]
|
|
|
|
# handle last extent
|
|
if prevExt is not None:
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
wid = prevExt.waveformID()
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(lastUpdate))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(segments)
|
|
|
|
data = "]}"
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
utils.writeTS(req, footer)
|
|
charCount += len(footer)
|
|
|
|
# merge of quality: all timespans of one stream are grouped by sample rate
|
|
elif ro.mergeQuality:
|
|
segGroups = OrderedDict()
|
|
for ext, attExt, _restricted in lines:
|
|
lineCount += 1
|
|
|
|
if ext is prevExt:
|
|
if attExt.sampleRate() in segGroups:
|
|
segGroup = segGroups[attExt.sampleRate()]
|
|
segGroup.updated = max(segGroup.updated, attExt.updated())
|
|
segGroup.segments.append(attExt)
|
|
continue
|
|
|
|
else:
|
|
if prevExt is not None:
|
|
wid = prevExt.waveformID()
|
|
for sr, sg in segGroups.items():
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
data += sampleRate.format(sr)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(sg.updated))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(sg.segments)
|
|
|
|
prevExt = ext
|
|
segGroups = OrderedDict()
|
|
|
|
segGroups[attExt.sampleRate()] = SegGroup([attExt], attExt.updated())
|
|
|
|
# handle last extent
|
|
if prevExt is not None:
|
|
wid = prevExt.waveformID()
|
|
for sr, sg in segGroups.items():
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
data += sampleRate.format(sr)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(sg.updated))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(sg.segments)
|
|
|
|
data = "]}"
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
utils.writeTS(req, footer)
|
|
charCount += len(footer)
|
|
|
|
# merge of sample rate: all timespans of one stream are grouped by
|
|
# quality
|
|
elif ro.mergeSampleRate:
|
|
segGroups = OrderedDict()
|
|
for ext, attExt, _restricted in lines:
|
|
lineCount += 1
|
|
|
|
if ext is prevExt:
|
|
if attExt.quality() in segGroups:
|
|
segGroup = segGroups[attExt.quality()]
|
|
segGroup.updated = max(segGroup.updated, attExt.updated())
|
|
segGroup.segments.append(attExt)
|
|
continue
|
|
|
|
else:
|
|
if prevExt is not None:
|
|
wid = prevExt.waveformID()
|
|
for q, sg in segGroups.items():
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
data += quality.format(q)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(sg.updated))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(sg.segments)
|
|
|
|
prevExt = ext
|
|
segGroups = OrderedDict()
|
|
|
|
segGroups[attExt.quality()] = SegGroup([attExt], attExt.updated())
|
|
|
|
# handle last extent
|
|
if prevExt is not None:
|
|
wid = prevExt.waveformID()
|
|
for q, sg in segGroups.items():
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
data += quality.format(q)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(sg.updated))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(sg.segments)
|
|
|
|
data = "]}"
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
utils.writeTS(req, footer)
|
|
charCount += len(footer)
|
|
|
|
# no merge: all timespans of one stream are grouped by tuple of quality and
|
|
# sampleRate
|
|
else:
|
|
segGroups = OrderedDict()
|
|
for ext, attExt, _restricted in lines:
|
|
lineCount += 1
|
|
t = (attExt.quality(), attExt.sampleRate())
|
|
|
|
if ext is prevExt:
|
|
if t in segGroups:
|
|
segGroup = segGroups[t]
|
|
segGroup.updated = max(segGroup.updated, attExt.updated())
|
|
segGroup.segments.append(attExt)
|
|
continue
|
|
|
|
else:
|
|
if prevExt is not None:
|
|
wid = prevExt.waveformID()
|
|
for (q, sr), sg in segGroups.items():
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
data += quality.format(q)
|
|
data += sampleRate.format(sr)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(sg.updated))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(sg.segments)
|
|
|
|
prevExt = ext
|
|
segGroups = OrderedDict()
|
|
|
|
segGroups[t] = SegGroup([attExt], attExt.updated())
|
|
|
|
# handle last extent
|
|
if prevExt is not None:
|
|
wid = prevExt.waveformID()
|
|
for (q, sr), sg in segGroups.items():
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if charCount == 0:
|
|
utils.writeTS(req, header)
|
|
charCount += len(header)
|
|
data = ""
|
|
else:
|
|
data = "]},"
|
|
|
|
data += nslc.format(
|
|
wid.networkCode(),
|
|
wid.stationCode(),
|
|
wid.locationCode(),
|
|
wid.channelCode(),
|
|
)
|
|
data += quality.format(q)
|
|
data += sampleRate.format(sr)
|
|
if ro.showLatestUpdate:
|
|
data += updated.format(self._formatTime(sg.updated))
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
charCount += writeSegments(sg.segments)
|
|
|
|
data = "]}"
|
|
utils.writeTS(req, data)
|
|
charCount += len(data)
|
|
utils.writeTS(req, footer)
|
|
charCount += len(footer)
|
|
|
|
return charCount, lineCount
|
|
|
|
# --------------------------------------------------------------------------
|
|
def _processRequest(self, req, ro, dac):
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
# tuples: wid, segment, restricted status
|
|
lines = []
|
|
|
|
charCount = 0
|
|
|
|
# iterate extents and create IN clauses of parent_oids in bunches
|
|
# of 1000 because the query size is limited
|
|
parentOIDs, idList, tooLarge = [], [], []
|
|
i = 0
|
|
for ext, objID, _ in ro.extentIter(dac, self.user, self.access):
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return False
|
|
|
|
if ro.excludeTooLarge:
|
|
if ext.segmentOverflow():
|
|
continue
|
|
elif ext.segmentOverflow():
|
|
tooLarge.append(ext)
|
|
continue
|
|
elif tooLarge:
|
|
continue
|
|
|
|
if i < 1000:
|
|
idList.append(objID)
|
|
i += 1
|
|
else:
|
|
parentOIDs.append(idList)
|
|
idList = [objID]
|
|
i = 1
|
|
|
|
if not ro.excludeTooLarge and tooLarge:
|
|
extents = ", ".join(
|
|
f"{e.waveformID().networkCode()}.{e.waveformID().stationCode()}."
|
|
f"{e.waveformID().locationCode()}.{e.waveformID().channelCode()}"
|
|
for e in tooLarge
|
|
)
|
|
|
|
msg = (
|
|
"Unable to process request due to database limitations. Some "
|
|
"selections have too many segments to process. Rejected extents: "
|
|
f"{{{extents}}}. This limitation may be resolved in a future version "
|
|
"of this webservice."
|
|
)
|
|
self.writeErrorPage(req, http.REQUEST_ENTITY_TOO_LARGE, msg, ro)
|
|
return False
|
|
|
|
if len(idList) > 0:
|
|
parentOIDs.append(idList)
|
|
else:
|
|
msg = "no matching availabilty information found"
|
|
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
|
|
return False
|
|
|
|
db = io.DatabaseInterface.Open(Application.Instance().databaseURI())
|
|
if db is None:
|
|
msg = "could not connect to database"
|
|
return self.renderErrorPage(req, http.SERVICE_UNAVAILABLE, msg, ro)
|
|
|
|
lines = self._lineIter(db, parentOIDs, req, ro, dac.extentsOID())
|
|
|
|
charCount, segCount = self._writeLines(req, lines, ro)
|
|
|
|
# Return 204 if no matching availability information was found
|
|
if segCount <= 0:
|
|
msg = "no matching availabilty information found"
|
|
self.writeErrorPage(req, http.NO_CONTENT, msg, ro)
|
|
return True
|
|
|
|
logging.debug(
|
|
f"{ro.service}: returned {segCount} segments (characters: {charCount})"
|
|
)
|
|
utils.accessLog(req, ro, http.OK, charCount, None)
|
|
|
|
return True
|
|
|
|
# --------------------------------------------------------------------------
|
|
@staticmethod
|
|
def _lineIter(db, parentOIDs, req, ro, oIDs):
|
|
def _T(name):
|
|
return db.convertColumnName(name)
|
|
|
|
dba = datamodel.DatabaseArchive(db)
|
|
|
|
for idList in parentOIDs:
|
|
if req._disconnected: # pylint: disable=W0212
|
|
return
|
|
|
|
# build SQL query
|
|
idStr = ",".join(str(x) for x in idList)
|
|
q = f"SELECT * from DataSegment WHERE _parent_oid IN ({idStr}) "
|
|
if ro.time:
|
|
if ro.time.start is not None:
|
|
if ro.time.start.microseconds() == 0:
|
|
q += f"AND {_T('end')} >= '{db.timeToString(ro.time.start)}' "
|
|
else:
|
|
q += (
|
|
"AND ({0} > '{1}' OR ("
|
|
f"{_T('end')} = '{db.timeToString(ro.time.start)}' AND "
|
|
f"end_ms >= {ro.time.start.microseconds()})) "
|
|
)
|
|
if ro.time.end is not None:
|
|
if ro.time.end.microseconds() == 0:
|
|
q += f"AND {_T('start')} < '{db.timeToString(ro.time.end)}' "
|
|
else:
|
|
q += (
|
|
"AND ({0} < '{1}' OR ("
|
|
f"{_T('start')} = '{db.timeToString(ro.time.end)}' AND "
|
|
"start_ms < {ro.time.end.microseconds()})) "
|
|
)
|
|
if ro.quality:
|
|
qualities = "', '".join(ro.quality)
|
|
q += f"AND {_T('quality')} IN ('{qualities}') "
|
|
q += f"ORDER BY _parent_oid, {_T('start')}, {_T('start_ms')}"
|
|
|
|
segIt = dba.getObjectIterator(q, datamodel.DataSegment.TypeInfo())
|
|
if segIt is None or not segIt.valid():
|
|
return
|
|
|
|
# Iterate and optionally merge segments.
|
|
# A segment will be yielded if
|
|
# - the extent changed
|
|
# - quality changed and merging of quality was not requested
|
|
# - sample rate changed and merging of sample rate was not
|
|
# requested
|
|
# - an overlap was detected and merging of overlaps was not
|
|
# requested
|
|
# - an gap was detected and the gaps exceeds the mergeGaps
|
|
# threshold
|
|
seg = None
|
|
ext = None
|
|
lines = 0
|
|
jitter = 0.0
|
|
while (
|
|
not req._disconnected # pylint: disable=W0212
|
|
and (seg is None or segIt.next())
|
|
and (not ro.limit or lines < ro.limit)
|
|
):
|
|
s = datamodel.DataSegment.Cast(segIt.get())
|
|
if s is None:
|
|
break
|
|
|
|
# get extent for current segment
|
|
try:
|
|
e, restricted = oIDs[segIt.parentOid()]
|
|
except KeyError:
|
|
logging.warning(f"parent object id not found: {segIt.parentOid()}")
|
|
continue
|
|
|
|
# first segment, no merge test required
|
|
if seg is None:
|
|
seg = s
|
|
ext = e
|
|
jitter = 1 / (2 * s.sampleRate())
|
|
continue
|
|
|
|
# merge test
|
|
diff = float(s.start() - seg.end())
|
|
mQuality = ro.mergeQuality or s.quality() == seg.quality()
|
|
mSampleRate = ro.mergeSampleRate or s.sampleRate() == seg.sampleRate()
|
|
mGaps = (ro.mergeGaps is None and diff <= jitter) or (
|
|
ro.mergeGaps is not None and diff <= ro.mergeGaps
|
|
)
|
|
mOverlap = -diff <= jitter or ro.mergeOverlap
|
|
if e is ext and mQuality and mSampleRate and mGaps and mOverlap:
|
|
seg.setEnd(s.end())
|
|
if s.updated() > seg.updated():
|
|
seg.setUpdated(s.updated())
|
|
continue
|
|
|
|
# merge was not possible, yield previous segment
|
|
yield (ext, seg, restricted)
|
|
lines += 1
|
|
seg = s
|
|
ext = e
|
|
if seg.sampleRate() != s.sampleRate():
|
|
jitter = 1 / (2 * s.sampleRate())
|
|
|
|
if seg is not None and (not ro.limit or lines < ro.limit):
|
|
yield (ext, seg, restricted)
|
|
|
|
# close database iterator if iteration was stopped because of
|
|
# row limit
|
|
return
|
|
|
|
|
|
# vim: ts=4 et tw=79
|