#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import re
import urllib
import urlparse
import struct
import time
import datetime
from wsgiref.handlers import format_date_time
import unicodedata
CONTRIB = os.path.join(os.path.dirname(__file__), '../../contrib')
MANY_SLASHES_PATTERN = r'[\/]+'
MANY_SLASHES_REGEX = re.compile(MANY_SLASHES_PATTERN)
PATTERN_ITEM_OR_KEY_ACCESS = r'^(?P<attr_name>[a-zA-Z][\w\d]*)' \
r'\[((?P<index>\d+)|' \
r'[\'\"](?P<key>[\s\w\d]+)[\'\"])\]$'
REGEX_ITEM_OR_KEY_ACCESS = re.compile(PATTERN_ITEM_OR_KEY_ACCESS)
SERVICEREFERENCE_PORTION_PATTERN = r'([\da-fA-F]+)[^\w]?'
SERVICEREFERENCE_PORTION_REGEX = re.compile(SERVICEREFERENCE_PORTION_PATTERN)
#: regular expression pattern for strings containing something resembling
#: a hostname/port combination (``<hostname>:<port>``)
PATTERN_HOST_HEADER = r'(\[(?P<ipv6_addr>.+?)\]|(?P<hostname>.+?))' \
r'(\:(?P<port>\d+))?$'
REGEX_HOST_HEADER = re.compile(PATTERN_HOST_HEADER)
# stolen from enigma2_http_api ...
# https://wiki.neutrino-hd.de/wiki/Enigma:Services:Formatbeschreibung
# Dezimalwert: 1=TV, 2=Radio, 4=NVod, andere=Daten
SERVICE_TYPE_TV = 0x01
SERVICE_TYPE_RADIO = 0x02
SERVICE_TYPE_SD4 = 0x10
SERVICE_TYPE_HDTV = 0x19
SERVICE_TYPE_UHD = 0x1f
SERVICE_TYPE_OPT = 0xd3
SERVICE_KIND_COMMENT = 0x64
# type 1 = digital television service
# type 2 = digital radio sound service
# type 4 = nvod reference service (NYI)
# type 10 = advanced codec digital radio sound service
# type 17 = MPEG-2 HD digital television service
# type 22 = advanced codec SD digital television
# type 24 = advanced codec SD NVOD reference service (NYI)
# type 25 = advanced codec HD digital television
# type 27 = advanced codec HD NVOD reference service (NYI)
SERVICE_TYPE = {
'TV': SERVICE_TYPE_TV,
'HDTV': SERVICE_TYPE_HDTV,
'RADIO': SERVICE_TYPE_RADIO,
'UHD': SERVICE_TYPE_UHD,
'SD4': SERVICE_TYPE_SD4,
'OPT': SERVICE_TYPE_OPT,
}
SERVICE_TYPE_LOOKUP = {v: k for k, v in SERVICE_TYPE.iteritems()}
#: Namespace - DVB-C services
NS_DVB_C = 0xffff0000
#: Namespace - DVB-S services
NS_DVB_S = 0x00c00000
#: Namespace - DVB-T services
NS_DVB_T = 0xeeee0000
#: Label:Namespace map
NS = {
'DVB-C': NS_DVB_C,
'DVB-S': NS_DVB_S,
'DVB-T': NS_DVB_T,
}
#: Namespace:Label lookup map
NS_LOOKUP = {v: k for k, v in NS.iteritems()}
CUTS_IN = 0
CUTS_OUT = 1
CUTS_MARK = 2
CUTS_WATCHMARK = 3
[docs]def lenient_decode(value, encoding=None):
"""
Decode an encoded string and convert it to an unicode string.
Args:
value: input value
encoding: string encoding, defaults to utf-8
Returns:
:obj:`unicode`: decoded value
>>> lenient_decode("Hallo")
u'Hallo'
>>> lenient_decode(u"Hallo")
u'Hallo'
>>> lenient_decode("HällöÜ")
u'H\\xe4ll\\xf6\\xdc'
"""
if isinstance(value, unicode):
return value
if encoding is None:
encoding = 'utf_8'
return value.decode(encoding, 'ignore')
[docs]def lenient_force_utf_8(value):
"""
Args:
value: input value
Returns:
:obj:`basestring` utf-8 encoded value
>>> isinstance(lenient_force_utf_8(''), basestring)
True
>>> lenient_force_utf_8(u"Hallo")
'Hallo'
>>> lenient_force_utf_8("HällöÜ")
'H\\xc3\\xa4ll\\xc3\\xb6\\xc3\\x9c'
"""
return lenient_decode(value).encode('utf_8')
[docs]def sanitise_filename_slashes(value):
"""
Args:
value(basestring): input value
Returns:
value w/o multiple slashes
>>> in_value = "///tmp/x/y/z"
>>> expected = re.sub("^/+", "/", "///tmp/x/y/z")
>>> sanitise_filename_slashes(in_value) == expected
True
"""
return re.sub(MANY_SLASHES_REGEX, '/', value)
[docs]def get_config_attribute(path, root_obj, head=None):
"""
Determine attribute of *root_obj* to be accessed by *path* in a
(somewhat) safe manner.
This implementation will allow key and index based accessing too
(e.g. ``config.some_list[0]`` or ``config.some_dict['some_key']``)
The *path* value needs to start with *head* (default='config').
Args:
path: character string specifying which attribute is to be accessed
root_obj: An object whose attributes are to be accessed.
head: Value of the first portion of *path*
Returns:
Attribute of *root_obj*
Raises:
ValueError: If *path* is invalid.
AttributeError: If attribute cannot be accessed
"""
if head is None:
head = 'config'
portions = path.split('.')
if len(portions) < 2:
raise ValueError('Invalid path length')
if portions[0] != head:
raise ValueError(
'Head is {!r}, expected {!r}'.format(portions[0], head))
current_obj = root_obj
for attr_name in portions[1:]:
if not attr_name:
raise ValueError("empty attr_name")
if attr_name.startswith('_'):
raise ValueError('private member')
matcher = REGEX_ITEM_OR_KEY_ACCESS.match(attr_name)
if matcher:
gdict = matcher.groupdict()
attr_name = gdict.get('attr_name')
next_obj = getattr(current_obj, attr_name)
if gdict.get("index"):
index = int(gdict.get("index"))
current_obj = next_obj[index]
else:
key = gdict["key"]
current_obj = next_obj[key]
else:
current_obj = getattr(current_obj, attr_name)
return current_obj
[docs]def parse_servicereference(serviceref, separators=None, extended=False):
"""
Parse a Enigma2 style service reference string representation.
Args:
serviceref: Enigma2 style service reference
separators: Allowed separators
Returns:
dict containing parsed values
Raises:
ValueError: If *serviceref* is invalid.
>>> sref = '1:0:1:300:7:85:00c00000:0:0:0:'
>>> result = parse_servicereference(sref)
>>> result
{'service_type': 1, 'oid': 133, 'tsid': 7, 'ns': 12582912, 'sid': 768}
>>> sref_dashes = '1-0-1-300-7-85-00c00000-0-0-0-'
>>> result_dashes = parse_servicereference(sref_dashes, ':-')
>>> result == result_dashes
True
>>> sref_g = create_servicereference(**result)
>>> sref_g
'1:0:1:300:7:85:00c00000:0:0:0:'
>>> sref_g2 = create_servicereference(result)
>>> sref_g2
'1:0:1:300:7:85:00c00000:0:0:0:'
>>> sref == sref_g
True
>>> sref2 = '1:64:A:0:0:0:0:0:0:0::SKY Sport'
>>> result2 = parse_servicereference(sref2)
>>> result2
{'service_type': 10, 'oid': 0, 'tsid': 0, 'ns': 0, 'sid': 0}
>>> result2e = parse_servicereference(sref2, extended=True)
>>> result2e['kind']
100
>>> sref3 = '1:0:0:0:0:0:0:0:0:0:/media/hdd/movie/20170921 2055 - DASDING - DASDING Sprechstunde - .ts' # NOQA
>>> result3 = parse_servicereference(sref3)
>>> result3
{'service_type': 0, 'oid': 0, 'tsid': 0, 'ns': 0, 'sid': 0}
"""
if separators is None:
separators = (':',)
elif isinstance(separators, basestring):
separators = list(separators)
for separator in separators:
parts = serviceref.split(separator)
try:
sref_data = {
'service_type': int(parts[2], 16),
'sid': int(parts[3], 16),
'tsid': int(parts[4], 16),
'oid': int(parts[5], 16),
'ns': int(parts[6], 16)
}
if extended:
sref_data['kind'] = int(parts[1], 16)
return sref_data
except IndexError:
continue
raise ValueError(separators)
[docs]def create_servicereference(*args, **kwargs):
"""
Generate a (Enigma2 style) service reference string representation.
:param args[0]: Service Reference Parameter as dict
:type args[0]: :class:`dict`
:param service_type: Service Type
:type service_type: int
:param sid: SID
:type sid: int
:param tsid: TSID
:type tsid: int
:param oid: OID
:type oid: int
:param ns: Enigma2 Namespace
:type ns: int
"""
if len(args) == 1 and isinstance(args[0], dict):
kwargs = args[0]
service_type = kwargs.get('service_type', 0)
sid = kwargs.get('sid', 0)
tsid = kwargs.get('tsid', 0)
oid = kwargs.get('oid', 0)
ns = kwargs.get('ns', 0)
return '{:x}:0:{:x}:{:x}:{:x}:{:x}:{:08x}:0:0:0:'.format(
1,
service_type,
sid,
tsid,
oid,
ns)
[docs]def get_servicereference_portions(value, raise_on_empty=False):
"""
Try to match possible portions of a servicereference in *value*.
Args:
value (basestring): a servicereference-like string
raise_on_empty (boolean): If a ValueError should be raised
Returns:
list: matched portions
Raises:
ValueError: If result is empty list and *raise_on_empty* is True
>>> deadbeef = ['de', 'ad', 'be', 'ef']
>>> get_servicereference_portions(None)
[]
>>> get_servicereference_portions(True)
[]
>>> get_servicereference_portions(False)
[]
>>> get_servicereference_portions('de:ad:be:ef') == deadbeef
True
>>> get_servicereference_portions('de,ad$be_ef??') == deadbeef
True
>>> get_servicereference_portions('-1:ad:be:ef:')
['1', 'ad', 'be', 'ef']
>>> get_servicereference_portions('-^ghi', raise_on_empty=True)
Traceback (most recent call last):
...
ValueError: -^ghi
>>> get_servicereference_portions('1:0:19:7C:6:85:FFFF0000:0:0:0:')
['1', '0', '19', '7C', '6', '85', 'FFFF0000', '0', '0', '0']
"""
rv = []
try:
rv = re.findall(SERVICEREFERENCE_PORTION_REGEX, value)
except TypeError:
pass
if not rv and raise_on_empty:
raise ValueError(value)
return rv
[docs]def mangle_snp(value):
"""
Mangle service_name as suggested by SNP (Service Name Picons)
.. seealso::
* https://github.com/picons/picons-source
* https://github.com/OpenViX/enigma2/blob/master/lib/python/Components/Renderer/Picon.py#L88-L89
Args:
value (basestring): service name
Returns:
str: normalised service name
>>> mangle_snp('?ANTENNE? BAYERN')
'antennebayern'
>>> mangle_snp('?Sky? ?Cine?ma +?24?')
'skycinemaplus24'
"""
unicode_value = lenient_decode(value, 'utf_8')
name = unicodedata.normalize('NFKD', unicode_value).encode(
'ASCII', 'ignore')
normalised = name.replace(
'&', 'and').replace('+', 'plus').replace('*', 'star').lower()
return re.sub('[^a-z0-9]', '', normalised)
[docs]def require_valid_file_parameter(request, parameter_key):
"""
Args:
request (twisted.web.server.Request): HTTP request object
parameter_key: filename parameter key
Returns:
basestring: existing filename
Raises:
ValueError: If *parameter_key* is missing.
IOError: If filename does not point to an existing file path
"""
if parameter_key not in request.args:
raise ValueError("Missing parameter: {!r}".format(parameter_key))
filename = lenient_force_utf_8(
urllib.unquote_plus(request.args[parameter_key][0]))
filename = sanitise_filename_slashes(os.path.realpath(filename))
if not os.path.exists(filename):
raise IOError("Not a file: {!r}".format(filename))
return filename
[docs]def build_url(hostname, path=None, args=None, scheme="http", port=None):
"""
Create an URL based on parameters.
Args:
hostname: hostname portion
path: path portion
args: query parameters
scheme: scheme portion
port: port portion
Returns:
basestring: Generated URL
>>> build_url("some.host", "/", {})
'http://some.host/'
>>> build_url("some.host", "/")
'http://some.host/'
>>> build_url("some.host")
'http://some.host'
>>> build_url("some.host", port=27080)
'http://some.host:27080'
>>> build_url("", port=27080)
Traceback (most recent call last):
...
ValueError: empty hostname!
>>> build_url("some.host", "x")
'http://some.host/x'
>>> build_url("some.host", "/x")
'http://some.host/x'
>>> build_url("some.host", "/x/")
'http://some.host/x/'
>>> build_url("some.host", "x/")
'http://some.host/x/'
>>> build_url("some.host", "x/../")
'http://some.host/x/../'
>>> build_url("some.host", "/:x/äöü-blabla/")
'http://some.host/%3Ax/%C3%A4%C3%B6%C3%BC-blabla/'
>>> build_url("some.host", u'/:x/\xe4\xf6\xfc-blabla/'.encode('utf-8'))
'http://some.host/%3Ax/%C3%A4%C3%B6%C3%BC-blabla/'
"""
if not hostname:
raise ValueError("empty hostname!")
netloc = hostname
if port:
netloc = '{:s}:{!s}'.format(hostname, port)
if path:
path_q = urllib.quote(path)
else:
path_q = ''
if args:
args_e = urllib.urlencode(args)
else:
args_e = None
return urlparse.urlunparse((scheme, netloc, path_q, None, args_e, None))
def parse_cuts(cutfile):
marks = {
"watched": 0,
"maximum": 0,
"marks": []
}
with open(cutfile, "rb") as source:
chunk = source.read(12)
while chunk:
(pts_value, cue_kind) = struct.unpack('>QI', chunk)
seconds = pts_value / 90000
if cue_kind == CUTS_WATCHMARK:
marks['watched'] = seconds
else:
marks['marks'].append([seconds, cue_kind])
chunk = source.read(12)
if marks['marks']:
marks['maximum'] = marks['marks'][-1][0]
return marks
[docs]def parse_simple_index(source):
"""
>>> snp_index = os.path.join(CONTRIB, 'picon-source/snp.index')
>>> snp = parse_simple_index(snp_index)
>>> len(snp.keys()) > 1
True
>>> snp['wdrduesseldorf']
'wdr'
>>> snp['wdrdusseldorf']
'wdr'
"""
lookup = dict()
with open(source, "rb") as src:
for line in src:
(key, sep, value) = line.strip().partition('=')
if key == value:
continue
lookup[key] = value
return lookup
def gen_reverse_proxy_configuration(configuration=None, template=None):
if template is None:
template = os.path.abspath(os.path.join(os.path.dirname(__file__),
'reverse_proxy.conf.template'))
if configuration is None:
configuration = dict()
with open(template, "rb") as src:
template_content = src.read()
fallback_values = {
"REVERSE_PROXY_PORT": 8000,
"ENIGMA2_HOST": "localhost",
"ENIGMA2_PORT": 80,
"OSCAM_PORT": 83,
"STREAM_PORT": 8001,
"STREAM_TRANSCODED_PORT": 8002,
"PUBLIC_ROOT": '/tmp/public',
"PICON_ROOT": '/tmp/picon',
}
for key in fallback_values:
value = configuration.get(key, fallback_values[key])
if key in ('PUBLIC_ROOT', 'PICON_ROOT'):
value = re.sub(r'\/+$', '', value)
search_key = '{{{:s}}}'.format(key)
template_content = template_content.replace(search_key, str(value))
return template_content
[docs]def mangle_service_type_arg(item):
"""
Translate 'tv' or 'radio' to a set containing the needed service_type IDs.
Other values of *item* are expected to be an integer value.
Args:
item (str or int): service_type value
Returns:
set: service_type IDs
Raises:
ValueError: If a non-integer value was provided
>>> mangle_service_type_arg("tv") == set([1, 195, 134, 17, 22, 25, 31])
True
>>> mangle_service_type_arg("radio") == set([2, 10])
True
>>> mangle_service_type_arg(0x10) == { 16 }
True
>>> mangle_service_type_arg(1) == {1}
True
>>> mangle_service_type_arg("1") == {1}
True
>>> mangle_service_type_arg("") == {1}
Traceback (most recent call last):
...
ValueError: invalid literal for int() with base 10: ''
"""
try:
if item.lower() == 'tv':
# service_types_tv = 1:7:1:0:0:0:0:0:0:0:(type == 1) || (type == 17) || (type == 22) || (type == 25) || (type == 31) || (type == 134) || (type == 195) # NOQA
return {1, 17, 22, 25, 31, 134, 195}
elif item.lower() == 'radio':
# service_types_radio = 1:7:2:0:0:0:0:0:0:0:(type == 2) || (type == 10)
return {2, 10}
except AttributeError:
pass
return { int(item) }
if __name__ == '__main__':
import doctest
(FAILED, SUCCEEDED) = doctest.testmod()
print("[doctest] SUCCEEDED/FAILED: {:d}/{:d}".format(SUCCEEDED, FAILED))