-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathread_db_cache_test.py
163 lines (147 loc) · 8.02 KB
/
read_db_cache_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from pymysqlreplication import BinLogStreamReader
import time
import multiprocessing
import logging
logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
import datetime
from settings import MYSQL_SETTINGS
from pymysqlreplication.tests import base
from pymysqlreplication.row_event import (
# DeleteRowsEvent,
# UpdateRowsEvent,
WriteRowsEvent,
)
def convert_to_second_int(datetimein):
return int(time.mktime(datetimein.timetuple())) #(datetimein-datetime.datetime(1970,1,1)).total_seconds()
class DB_fetcher(multiprocessing.Process):
def __init__(self, share_image_queue, MYSQL_SETTINGS ,target_schema = "capstone",
target_table = "crack_detection_result",
start_time = datetime.datetime(1970,1,2,0,1,0)): # 1970,1,1 is almost 0 second
multiprocessing.Process.__init__(self)
self.share_image_queue = share_image_queue
self.target_schema = target_schema
self.target_table = target_table
self.skip_to_timestamp = convert_to_second_int(start_time)
print(" >>>>> start time is {}".format(self.skip_to_timestamp))
def run(self):
while True:
self.stream = BinLogStreamReader(
connection_settings=MYSQL_SETTINGS,
only_events=[WriteRowsEvent], # [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
server_id=3,
slave_heartbeat=1
,skip_to_timestamp = self.skip_to_timestamp)
# self.idx = 0
# self.show_slave_status()
for binlogevent in self.stream:
prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table)
if prefix == self.target_schema + ":" + self.target_table + ":":
# print(binlogevent.rows)
for new_update_row in binlogevent.rows:
logging.info(" >>> find new row {} time: {}".format(new_update_row, binlogevent.timestamp))
# format for a row: {'values':
# {'video_id': 1, 'frame_id': 4, 'insert_time': datetime.datetime(2008, 6, 19, 0, 0),
# 'frame_loc': 'RANDOM_LL', 'detect_flag': boolean, 'result_loc': None}}
# if new_update_row["values"]["detect_flag"] == 0 and \
# (new_update_row["values"]['insert_time'] is not None or new_update_row["values"]['insert_time'] != '' )\
# and ((new_update_row["values"]['frame_id'] > 170 and new_update_row["values"]['frame_id' ] < 200)
# or (new_update_row["values"]['frame_id'] > 1010 and new_update_row["values"]['frame_id' ] < 1035)
# or (new_update_row["values"]['frame_id'] > 1155 and new_update_row["values"]['frame_id' ] < 1177)
# or (new_update_row["values"]['frame_id'] > 1312 and new_update_row["values"]['frame_id' ] < 1316)
# or (new_update_row["values"]['frame_id'] > 2127 and new_update_row["values"]['frame_id' ] < 2150)):
if new_update_row["values"]["detect_flag"] == 0 and \
(new_update_row["values"]['insert_time'] is not None or new_update_row["values"]['insert_time'] != ''
):
#logging.info(" >>> for this row, the flag is {}".format(new_update_row['detect_flag']))
try:
self.share_image_queue.put(new_update_row["values"],True,1)
logging.info(" >>> adding 1 image to queue {}".format(new_update_row))
except Exception as e:
logging.error(e)
self.skip_to_timestamp = convert_to_second_int(datetime.datetime.now())
time.sleep(2)
def close_stream(self):
self.stream.close()
# def main(target_schema = "capstone", target_table = "crack_detection_result", last_read_id = 0):
# """
# details of binLogStreamRead: https://github.com/noplay/python-mysql-replication/blob/master/pymysqlreplication/binlogstream.py
# :return:
# """
# current = datetime.datetime(2018,7,29, 13, 10,0)
# print(" 7, 29 13.10 convert_current_time_to{}".format(convert_to_second_int(current)))
# stream = BinLogStreamReader(
# connection_settings=MYSQL_SETTINGS,
# only_events= [WriteRowsEvent], #[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
# server_id = 3,
# slave_heartbeat = 1,
# skip_to_timestamp = convert_to_second_int(current))
#
# print(stream)
# print(">>>>>>>>>> getting update")
# for binlogevent in stream:
# # print(">>> binlogevent {}".format(binlogevent))
#
# prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table)
# # print(prefix)
# if prefix == target_schema+":"+target_table+":":
# new_update_rows = binlogevent.rows[last_read_id:]
# for vals in new_update_rows:
# # format: {'values':
# # {'map_id': 1, 'timestamp': datetime.datetime(2008, 6, 19, 0, 0),
# # 'image_location': 'RANDOM_LL', 'crack_detect': None, 'result_locatoin': None}}
# if vals["values"]["detect_flag"] is None:
# # settings.image_processing_queue.put(vals["values"])
# print(" >>> adding 1 image to queue: {}".format(vals["values"]))
# last_read_id += 1
# time.sleep(1)
# stream.close()
#
#
# class DB_fetcher( base.PyMySQLReplicationTestCase ):
# def __init__(self, share_image_queue, MYSQL_SETTINGS ,target_schema = "capstone",
# target_table = "new_table", last_read_id = 0,
# skip_to_timestamp = datetime.datetime(1900,1,1,0,0,0)):
# # multiprocessing.Process.__init__(self)
# self.share_image_queue = share_image_queue
# self.target_schema = target_schema
# self.target_table = target_table
# self.last_read_id = last_read_id
# # self.idx = 0
#
# timestamp = self.execute('SELECT UNIX_TIMESTAMP()').fetchone()[0]
# self.skip_to_timestamp = skip_to_timestamp.strftime('%Y-%m-%d %H:%M:%S')
#
# def run(self):
# while True:
# self.stream = BinLogStreamReader(
# connection_settings=MYSQL_SETTINGS,
# only_events=[WriteRowsEvent], # [DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
# server_id=3,
# slave_heartbeat=1
# ,
# skip_to_timestamp = self.skip_to_timestamp)
#
# # self.idx = 0
# # self.show_slave_status()
# for binlogevent in self.stream:
# prefix = "%s:%s:" % (binlogevent.schema, binlogevent.table)
# if prefix == self.target_schema + ":" + self.target_table + ":":
# # print(binlogevent.rows)
# for new_update_row in binlogevent.rows:
# logging.info(" >>> find new row {}".format(new_update_row))
# # format for a row: {'values':
# # {'map_id': 1, 'timestamp': datetime.datetime(2008, 6, 19, 0, 0),
# # 'image_location': 'RANDOM_LL', 'crack_detect': None, 'result_locatoin': None}}
# if new_update_row["values"]["crack_detect"] is None:
# self.share_image_queue.put(new_update_row["values"])
# logging.info(" >>> adding 1 image to queue")
# self.skip_to_timestamp = datetime.datetime.now().time()
# time.sleep(3)
#
# def close_stream(self):
# self.stream.close()
# if __name__ == "__main__":
# main()
# images_details = multiprocessing.Queue()
# server_fetcher = DB_fetcher(images_details, MYSQL_SETTINGS)
# server_fetcher.start()