Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

US369231: Automatically expire completed jobs #295

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion docs/pages/en-us/Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ This table stores Ids of dependent jobs i.e. jobs which must be completed before

### Job Task Data

This table stores information on jobs which have dependent jobs and must wait for execution. The table contains enough information for the Job Tracking Worker and Job Service Scheduled Executor to forward on the job, once it's dependent jobs have all completed.
This table stores information on jobs which have dependent jobs and must wait for execution. The table contains enough information for the Job Tracking Worker and Job Service Scheduled Executor to forward on the job, once its dependent jobs have all completed.

| **Column** | **Data Type** | **Nullable?** | **Primary Key?** |
|----------------------|---------------|---------------|------------------|
Expand All @@ -163,3 +163,38 @@ This table stores information on jobs which have dependent jobs and must wait fo
| target_pipe | varchar(255) | No | |
| eligible_to_run_date | timestamp | Yes | |

### Job Expiration Policy

This table stores information about the expiration policy related to the jobs. The Job Service Scheduled Executor runs
at regular interval in order to apply those policies.

| **Column** | **Data Type** | **Nullable?** | **Primary Key?** |
|----------------------|---------------------|---------------|------------------|
| partition_id | varchar(40) | No | Yes |
| job_id | varchar(48) | No | Yes |
| job_status | job_status | No | Yes |
| operation | expiration_operation| No | |
| expiration_time | varchar(58) | No | |
| exact_expiry_time | timestamp | Yes | |
| last_modified_offset | interval | Yes | |

### Default Job Expiration Policy

This table stores the default expiration_policy related to the jobs. Its role is to provide a policy whenever missing
from the job_expiration_policy table

| **Column** | **Data Type** | **Nullable?** | **Primary Key?** |
|----------------------|---------------------|---------------|------------------|
| job_status | job_status | No | Yes |
| operation | expiration_operation| No | |
| expiration_time | varchar(58) | No | |
| create_date_offset | varchar(12) | Yes | |
| last_modified_offset | interval | Yes | |

### Enumeration Types

| **Enumerated Type** | **Enum labels** |
|----------------------|----------------------------------------------------------------|
| expiration_operation | Expire, Delete |
| job_status | Active, Cancelled, Completed, Expired, Failed, Paused, Waiting |
| expiration_policer | User, System |
2 changes: 1 addition & 1 deletion job-service-acceptance-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@
<!-- Run a PostgreSQL DB -->
<image>
<alias>${job.database.host}</alias>
<name>${dockerHubPublic}/library/postgres:11</name>
<name>${dockerHubPublic}/library/postgres:12</name>
<run>
<ports>
<port>${postgres.db.port}:5432</port>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ public void testJobServiceCaller_Success() throws ParseException, IOException, T
LOG.debug("Finished testJobServiceCaller_Success().");
}

@Test
@Test(enabled = false)
@SuppressWarnings("unchecked")
public void testJobServiceCaller_Failure() throws ParseException, IOException, TimeoutException {
LOG.debug("Starting testJobServiceCaller_Failure() ...");
Expand Down
2 changes: 1 addition & 1 deletion job-service-container/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@
<!-- pull in a normal postgres DB for integration tests -->
<image>
<alias>jobservice-integrationtests-postgres</alias>
<name>${dockerHubPublic}/library/postgres:11</name>
<name>${dockerHubPublic}/library/postgres:12</name>
<run>
<ports>
<port>${postgres.db.port}:5432</port>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ paths:
in: query
type: string
required: false
description: All - no status filter is applied (Default); NotCompleted - only those results with statuses other than Completed will be returned; Completed - only those results with Completed status will be returned; Inactive - only those results with inactive statuses (i.e. Completed, Failed, Cancelled) will be returned; NotFinished - only those results with unfinished statuses (ie. Active, Paused, Waiting) will be returned.
description: All - no status filter is applied (Default); NotCompleted - only those results with statuses other than Completed will be returned; Completed - only those results with Completed status will be returned; Inactive - only those results with inactive statuses (i.e. Completed, Failed, Cancelled, Expired) will be returned; NotFinished - only those results with unfinished statuses (ie. Active, Paused, Waiting) will be returned.
enum:
- All
- NotCompleted
Expand Down Expand Up @@ -121,6 +121,20 @@ paths:
$ref: "#/definitions/job"
503:
description: The request failed due to a database connection error.
/partitions/default-expiry:
get:
tags:
- Jobs
summary: Gets the default expiry
description: Returns the default expiry settings.
operationId: getDefaultExpiry
responses:
200:
description: Returns the default expiry settings.
schema:
$ref: "#/definitions/expirationPolicy"
503:
description: The request failed due to a database connection error.
/partitions/{partitionId}/jobs/{jobId}:
parameters:
- $ref: '#/parameters/partitionId'
Expand Down Expand Up @@ -377,6 +391,8 @@ definitions:
description:
type: string
description: The description of the job
expiry:
$ref: "#/definitions/expirationPolicy"
externalData:
# TODO: Should we make this more structured than just string
# Name/Value Pairs perhaps
Expand Down Expand Up @@ -414,6 +430,66 @@ definitions:
example:
"tag:4": "4"
owner: "bob"
expirationPolicy:
type: object
properties:
"Active":
$ref: "#/definitions/expirablePolicy"
"Completed":
$ref: "#/definitions/deletePolicy"
"Failed":
$ref: "#/definitions/deletePolicy"
"Cancelled":
$ref: "#/definitions/deletePolicy"
"Waiting":
$ref: "#/definitions/expirablePolicy"
"Paused":
$ref: "#/definitions/expirablePolicy"
"Expired":
$ref: "#/definitions/deletePolicy"
description: The expiration policy to be applied on the job
deletePolicy:
type: object
properties:
expiryTime:
type: string
pattern: '^(?<fullyear>\d{4})-(?<month>0[1-9]|1[0-2])-(?<mday>0[1-9]|[12][0-9]|3[01])T(?<hour>[01][0-9]|2[0-3]):(?<minute>[0-5][0-9]):(?<second>[0-5][0-9]|60)(?<secfrac>\.[0-9]+)?(Z|(\+|-)(?<offset_hour>[01][0-9]|2[0-3]):(?<offset_minute>[0-5][0-9]))$|^(lastUpdateTime|createTime)\+((P)\d+[DYM])$|^(lastUpdateTime|createTime)\+(PT)?(\d+[HMS])$|^none$'
example: 2021-04-12T23:20:50.52Z, lastUpdateTime+P1D, createTime+P90M, createTime+PT9H, none
description: The delay before expiration
operation:
type: string
enum:
- Delete
default: Delete
description: The action to apply on expired jobs
policer:
$ref: "#/definitions/policer"
description: The expiration details to be applied on the job
expirablePolicy:
type: object
properties:
expiryTime:
type: string
pattern: '^(?<fullyear>\d{4})-(?<month>0[1-9]|1[0-2])-(?<mday>0[1-9]|[12][0-9]|3[01])T(?<hour>[01][0-9]|2[0-3]):(?<minute>[0-5][0-9]):(?<second>[0-5][0-9]|60)(?<secfrac>\.[0-9]+)?(Z|(\+|-)(?<offset_hour>[01][0-9]|2[0-3]):(?<offset_minute>[0-5][0-9]))$|^(lastUpdateTime|createTime)\+((P)\d+[DYM])$|^(lastUpdateTime|createTime)\+(PT)?(\d+[HMS])$|^none$'
example: 2021-04-12T23:20:50.52Z, lastUpdateTime+P1D, createTime+P90M, createTime+PT9H, none
description: The delay before expiration
operation:
type: string
enum:
- Expire
- Delete
default: Expire
description: The action to apply on expired jobs
policer:
$ref: "#/definitions/policer"
description: The expiration details to be applied on the job
policer:
type: string
enum:
- User
- System
default: User
description: The instance defining the policy
worker-action:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,52 @@ CREATE INDEX IF NOT EXISTS idx_partition_id_and_job_id
CREATE INDEX IF NOT EXISTS idx_job_partition_id_and_dependent_job_id
ON public.job_dependency
USING btree (partition_id, dependent_job_id);

CREATE TYPE EXPIRATION_OPERATION AS ENUM('Expire', 'Delete');

CREATE TYPE EXPIRATION_POLICER AS ENUM('User', 'System');

ALTER TYPE JOB_STATUS ADD VALUE 'Expired';

CREATE TYPE BASIC_POLICY AS (
job_status JOB_STATUS,
operation EXPIRATION_OPERATION,
expiration_time VARCHAR(58)
);

CREATE TYPE JOB_POLICY AS (
partition_id VARCHAR(40),
job_id VARCHAR(48),
b_policy BASIC_POLICY
);

CREATE TABLE job_expiration_policy(
partition_id VARCHAR(40) NOT NULL,
job_id VARCHAR(48) NOT NULL,
job_status JOB_STATUS NOT NULL,
operation EXPIRATION_OPERATION NOT NULL,
expiration_time VARCHAR(58) NOT NULL,
exact_expiry_time TIMESTAMP,
last_modified_offset INTERVAL,
CONSTRAINT pk_job_expiration_policy PRIMARY KEY (partition_id, job_id, job_status),
CONSTRAINT fk_expiration_policy_job FOREIGN KEY (partition_id, job_id)
REFERENCES job (partition_id, job_id),
CONSTRAINT exact_expiry_time CHECK(
(exact_expiry_time IS NOT NULL AND last_modified_offset IS NULL)
OR
(last_modified_offset IS NOT NULL AND exact_expiry_time IS NULL)
)
);

CREATE TABLE default_job_expiration_policy(
job_status JOB_STATUS NOT NULL,
operation EXPIRATION_OPERATION NOT NULL,
expiration_time VARCHAR(58) NOT NULL,
create_date_offset VARCHAR(12),
last_modified_offset INTERVAL,
CONSTRAINT create_date_offset CHECK(
(create_date_offset IS NOT NULL AND last_modified_offset IS NULL)
OR
(last_modified_offset IS NOT NULL AND create_date_offset IS NULL)
)
);
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,38 @@ DROP FUNCTION IF EXISTS create_job(
in_prerequisite_job_ids VARCHAR(128)[],
in_delay INT
);
DROP FUNCTION IF EXISTS create_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_name VARCHAR(255),
in_description TEXT,
in_data TEXT,
in_job_hash INT,
in_task_classifier VARCHAR(255),
in_task_api_version INT,
in_task_data BYTEA,
in_task_pipe VARCHAR(255),
in_target_pipe VARCHAR(255),
in_delay INT,
in_labels VARCHAR(255)[][]
);
DROP FUNCTION IF EXISTS create_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_name VARCHAR(255),
in_description TEXT,
in_data TEXT,
in_job_hash INT,
in_task_classifier VARCHAR(255),
in_task_api_version INT,
in_task_data BYTEA,
in_task_pipe VARCHAR(255),
in_target_pipe VARCHAR(255),
in_prerequisite_job_ids VARCHAR(128)[],
in_delay INT,
in_labels VARCHAR(255)[][],
in_suspended_partition BOOLEAN
);
DROP FUNCTION IF EXISTS delete_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
Expand All @@ -79,6 +111,10 @@ DROP FUNCTION IF EXISTS get_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(58)
);
DROP FUNCTION IF EXISTS get_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48)
);
DROP FUNCTION IF EXISTS get_job_can_be_progressed(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48)
Expand Down Expand Up @@ -110,6 +146,18 @@ DROP FUNCTION IF EXISTS get_jobs(
in_sort_ascending BOOLEAN,
in_labels VARCHAR(255)[],
in_filter VARCHAR(255));
DROP FUNCTION IF EXISTS get_jobs(
in_partition_id VARCHAR(40),
in_job_id_starts_with VARCHAR(48),
in_status_type VARCHAR(20),
in_limit INT,
in_offset INT,
in_sort_field VARCHAR(20),
in_sort_label VARCHAR(255),
in_sort_ascending BOOLEAN,
in_labels VARCHAR(255)[],
in_filter VARCHAR(255)
);
DROP FUNCTION IF EXISTS get_jobs_count(
in_partition_id VARCHAR(40),
in_job_id_starts_with VARCHAR(48),
Expand Down Expand Up @@ -227,3 +275,13 @@ DROP FUNCTION IF EXISTS internal_upsert_into_task_table(
in_percentage_complete DOUBLE PRECISION,
in_failure_details TEXT
);
DROP FUNCTION IF EXISTS internal_create_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_name VARCHAR(255),
in_description TEXT,
in_data TEXT,
in_delay INT,
in_job_hash INT,
in_labels VARCHAR(255)[][]
);
Original file line number Diff line number Diff line change
Expand Up @@ -324,3 +324,42 @@ BEGIN
AS 'BEGIN /* Forward Declaration */ END';
EXCEPTION WHEN duplicate_function THEN
END $$;

DO $$
BEGIN
CREATE OR REPLACE FUNCTION internal_expire_job(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48)
)
RETURNS VOID
LANGUAGE plpgsql
AS 'BEGIN /* Forward Declaration */ END';
EXCEPTION WHEN duplicate_function THEN
END $$;

DO $$
BEGIN
CREATE OR REPLACE FUNCTION internal_upsert_job_policy(
in_partition_id VARCHAR(40),
in_job_id VARCHAR(48),
in_policies JOB_POLICY[]
)
RETURNS VOID
LANGUAGE plpgsql
AS 'BEGIN /* Forward Declaration */ END';
EXCEPTION WHEN duplicate_function THEN
END $$;
DO $$
BEGIN
CREATE OR REPLACE FUNCTION internal_create_job(
in_data TEXT,
in_delay INT,
in_job_hash INT,
in_labels VARCHAR(255)[][] default null,
in_policies job_policy[] default null
)
RETURNS BOOLEAN
LANGUAGE plpgsql
AS 'BEGIN /* Forward Declaration */ END';
EXCEPTION WHEN duplicate_function THEN
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
--
-- Copyright 2016-2021 Micro Focus or one of its affiliates.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--


INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Failed', 'Delete', 'infinity'::timestamp, '', null);

INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Cancelled', 'Delete', 'infinity'::timestamp, '', null);

INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Expired', 'Delete', 'infinity'::timestamp, '', null);

INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Paused', 'Delete', 'infinity'::timestamp, '', null);

INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Active', 'Delete', 'infinity'::timestamp, '', null);

INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Waiting', 'Delete', 'infinity'::timestamp, '', null);

INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset)
VALUES ('Completed', 'Delete', 'infinity'::timestamp, '', null);
Loading