-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
153 lines (132 loc) · 4.15 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# encoding: utf-8
from flask import url_for
from flask import redirect as flask_redirect
from functools import wraps
from flask import request, session, jsonify
from json import loads
from pprint import pprint
from werkzeug.exceptions import HTTPException
class HTTPExceptionWithResponse(HTTPException):
def __init__(self, response):
self._response = response
super(HTTPExceptionWithResponse, self).__init__()
@property
def code(self):
print self._response.status_code
return self._response.status_code
def get_response(self, environ):
return self._response
class RedirectToException(HTTPExceptionWithResponse):
def __init__(self, *args, **kws):
super(RedirectToException, self).__init__(
response=redirect_to(*args, **kws))
def redirect_to(*args,**kws):
code = kws.pop("code",302)
anchor = kws.pop('_anchor', None)
to = url_for(*args,**kws)
if anchor is not None:
to += ('#' + anchor)
return redirect(to, code=code)
def redirect(*args, **kws):
code = kws.pop("code",302)
try:
to = args[0] or kws.pop("to", "/")
except IndexError as e:
to = "/"
from werkzeug import exceptions
from flask import current_app, request
if current_app.is_ssl_request() and not to.startswith('https'):
try:
endpoint = current_app.url_map.bind(request.host, to).match()[0]
if current_app.in_ssl_required_endpoint(endpoint):
to = "https://%s%s" % (request.host, to)
except exceptions.NotFound as e:
pass
return flask_redirect(to, code=code)
def receive_json():
def decorator(f):
@wraps(f)
def decorated(*args, **kws):
if request.method == "POST":
posted = request.stream.read()
jsondata = loads(posted)
else:
jsondata = loads(request.args['data'])
kws.update(jsondata)
return f(*args,**kws)
return decorated
return decorator
CC = 31
from struct import pack,unpack
from base64 import urlsafe_b64encode,urlsafe_b64decode
import struct
from datetime import datetime
epoch = datetime(1970, 1, 1)
def dt2int(dt):
td = dt - epoch
return int(td.total_seconds())
def urlsafe_pack_ulong(n):
s = pack("L",n)
return urlsafe_b64encode(s)[:-2]
def urlsafe_pack_ulonglong(n):
s = pack("Q",n)
return urlsafe_b64encode(s)[:-1]
def encode_id(n, k=CC):
c = n * k % 256
n = n ^ sum((c * (x + 1) & 255) << 8* x for x in range(8))
s = pack("Q",n)
s = s + chr(c)
return urlsafe_b64encode(s)
def decode_id(s64, k=CC):
if isinstance(s64,unicode):
s64 = s64.encode("utf-8")
try:
s = urlsafe_b64decode(s64)
except TypeError,e:
#raise
print "::::1::::", s64
return None
c = ord(s[-1])
body = s[:-1]
try:
n_tup = unpack("Q", body)
except struct.error, e:
#raise
print "::::2::::", s64
return None
if not n_tup:
print "::::3::::", s64
return None
n = n_tup[0]
n = n ^ sum((c * (x + 1) & 255) << 8 * x for x in range(8))
if not ord(s[-1]) == n * k % 256:
print "::::4::::", s64
print "::::4::::", n
return None
return n
def drop_all_tables(metadata, engine):
con = engine.connect()
for v in metadata.tables.values():
if v.exists():
if engine.name == "postgresql":
con.execute(("""DROP TABLE %s CASCADE""" % v.name))
else:
v.drop(checkfirst=True)
def create_obj(model, argnames, data):
return [model(**dict(zip(argnames, cols))).save()
for cols in data]
def reset_tables():
from flask import current_app
db=current_app.db
db.metadata.bind = db.engine
db.metadata.drop_all()
db.metadata.create_all()
if __name__ == "__main__":
print get_username()
for x in range(1,10):
print x, encode_id(x)
for x in range(8):
n = 256**x + x ** 3 + x
print "%20d"%n,
print "%20d"%decode_id(encode_id(n)), decode_id(encode_id(n)) == n, encode_id(n)
assert decode_id("2") is None