-
Notifications
You must be signed in to change notification settings - Fork 15
/
spock_worker.h
182 lines (141 loc) · 4.91 KB
/
spock_worker.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*-------------------------------------------------------------------------
*
* spock_worker.h
* spock worker helper functions
*
* Copyright (c) 2022-2023, pgEdge, Inc.
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, The Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef SPOCK_WORKER_H
#define SPOCK_WORKER_H
#include "storage/lock.h"
#include "spock.h"
#include "spock_output_plugin.h" /* for SpockOutputSlotGroup */
typedef enum {
SPOCK_WORKER_NONE, /* Unused slot. */
SPOCK_WORKER_MANAGER, /* Manager. */
SPOCK_WORKER_APPLY, /* Apply. */
SPOCK_WORKER_SYNC /* Special type of Apply that synchronizes
* one table. */
} SpockWorkerType;
typedef struct SpockApplyWorker
{
Oid subid; /* Subscription id for apply worker. */
bool sync_pending; /* Is there new synchronization info pending?. */
XLogRecPtr replay_stop_lsn; /* Replay should stop here if defined. */
RepOriginId replorigin; /* Remote origin id of apply worker. */
TimestampTz last_ts; /* Last remote commit timestamp. */
} SpockApplyWorker;
typedef struct SpockSyncWorker
{
SpockApplyWorker apply; /* Apply worker info, must be first. */
NameData nspname; /* Name of the schema of table to copy if any. */
NameData relname; /* Name of the table to copy if any. */
} SpockSyncWorker;
typedef struct SpockWorker {
SpockWorkerType worker_type;
/* Generation counter incremented at each registration */
uint16 generation;
/* Pointer to proc array. NULL if not running. */
PGPROC *proc;
/* Time at which worker crashed (normally 0). */
TimestampTz crashed_at;
/* Database id to connect to. */
Oid dboid;
/* Type-specific worker info */
union
{
SpockApplyWorker apply;
SpockSyncWorker sync;
} worker;
} SpockWorker;
typedef struct SpockContext {
/* Write lock for the entire context. */
LWLock *lock;
/* Access lock for Lag Tracking Hash. */
LWLock *lag_lock;
/* Interval for pruning the conflict_tracker table */
int ctt_prune_interval;
Datum ctt_last_prune;
/* Supervisor process. */
PGPROC *supervisor;
/* Signal that subscription info have changed. */
bool subscriptions_changed;
/* cluster read-only global flag */
bool cluster_is_readonly;
/* Spock slot-group data */
LWLock *slot_group_master_lock;
int slot_ngroups;
SpockOutputSlotGroup *slot_groups;
/* Background workers. */
int total_workers;
SpockWorker workers[FLEXIBLE_ARRAY_MEMBER];
} SpockContext;
typedef enum spockStatsType
{
SPOCK_STATS_INSERT_COUNT = 0,
SPOCK_STATS_UPDATE_COUNT,
SPOCK_STATS_DELETE_COUNT,
SPOCK_STATS_CONFLICT_COUNT,
SPOCK_STATS_DCA_COUNT,
SPOCK_STATS_NUM_COUNTERS = SPOCK_STATS_DCA_COUNT + 1
} spockStatsType;
typedef struct spockStatsKey
{
/* hash key */
Oid dboid; /* Database Oid */
Oid subid; /* Subscription (InvalidOid for sender) */
Oid relid; /* Table Oid */
} spockStatsKey;
typedef struct spockStatsEntry
{
spockStatsKey key; /* hash key */
int64 counter[SPOCK_STATS_NUM_COUNTERS];
slock_t mutex;
} spockStatsEntry;
/* A sample associating a WAL location with the time it was written. */
typedef struct
{
XLogRecPtr lsn;
TimestampTz time;
} WalTimeSample;
typedef struct LagTrackerEntry
{
char slotname[NAMEDATALEN];
WalTimeSample commit_sample;
} LagTrackerEntry;
extern HTAB *LagTrackerHash;
extern HTAB *SpockHash;
extern SpockContext *SpockCtx;
extern SpockWorker *MySpockWorker;
extern SpockApplyWorker *MyApplyWorker;
extern SpockSubscription *MySubscription;
extern int spock_stats_max_entries_conf;
extern int spock_stats_max_entries;
extern bool spock_stats_hash_full;
#define SPOCK_STATS_MAX_ENTRIES(_nworkers) \
(spock_stats_max_entries_conf < 0 ? (1000 * _nworkers) \
: spock_stats_max_entries_conf)
extern volatile sig_atomic_t got_SIGTERM;
extern void handle_sigterm(SIGNAL_ARGS);
extern void spock_subscription_changed(Oid subid, bool kill);
extern void spock_worker_shmem_init(void);
extern int spock_worker_register(SpockWorker *worker);
extern void spock_worker_attach(int slot, SpockWorkerType type);
extern SpockWorker *spock_manager_find(Oid dboid);
extern SpockWorker *spock_apply_find(Oid dboid, Oid subscriberid);
extern List *spock_apply_find_all(Oid dboid);
extern SpockWorker *spock_sync_find(Oid dboid, Oid subid,
const char *nspname, const char *relname);
extern List *spock_sync_find_all(Oid dboid, Oid subscriberid);
extern SpockWorker *spock_get_worker(int slot);
extern bool spock_worker_running(SpockWorker *w);
extern void spock_worker_kill(SpockWorker *worker);
extern const char * spock_worker_type_name(SpockWorkerType type);
extern void handle_stats_counter(Relation relation, Oid subid,
spockStatsType typ, int ntup);
extern LagTrackerEntry *lag_tracker_entry(char *slotname, XLogRecPtr lsn, TimestampTz ts);
#endif /* SPOCK_WORKER_H */