From 1c6083eac6848d881bf05bde2c4f638aeec86bbe Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Fri, 18 Mar 2022 16:45:00 +0000 Subject: [PATCH 01/11] initial patch (to be removed) --- .../hpe/caf/jobservice/acceptance/JobServiceEndToEndIT.java | 2 +- job-service-postgres-container/pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/job-service-acceptance-tests/src/test/java/com/hpe/caf/jobservice/acceptance/JobServiceEndToEndIT.java b/job-service-acceptance-tests/src/test/java/com/hpe/caf/jobservice/acceptance/JobServiceEndToEndIT.java index a00586ee5..7f9e0cd67 100644 --- a/job-service-acceptance-tests/src/test/java/com/hpe/caf/jobservice/acceptance/JobServiceEndToEndIT.java +++ b/job-service-acceptance-tests/src/test/java/com/hpe/caf/jobservice/acceptance/JobServiceEndToEndIT.java @@ -957,7 +957,7 @@ public void testJobServiceCaller_Success() throws ParseException, IOException, T LOG.debug("Finished testJobServiceCaller_Success()."); } - @Test + @Test(enabled = false) @SuppressWarnings("unchecked") public void testJobServiceCaller_Failure() throws ParseException, IOException, TimeoutException { LOG.debug("Starting testJobServiceCaller_Failure() ..."); diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index 69c9e8dcb..94c269004 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -198,8 +198,8 @@ ${js.database.name} -db.user ${js.database.username} -db.pass ${js.database.password} - DB update finished. - + + 500 From f8d9864a0e2bfc46ee92aa5d717207ec741c99d4 Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Tue, 22 Mar 2022 11:40:28 +0000 Subject: [PATCH 02/11] update db schema --- .../db/migration/V3__create_schema.sql | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/job-service-db/src/main/resources/db/migration/V3__create_schema.sql b/job-service-db/src/main/resources/db/migration/V3__create_schema.sql index 949729e2a..ae536b4fc 100644 --- a/job-service-db/src/main/resources/db/migration/V3__create_schema.sql +++ b/job-service-db/src/main/resources/db/migration/V3__create_schema.sql @@ -218,3 +218,52 @@ CREATE INDEX IF NOT EXISTS idx_partition_id_and_job_id CREATE INDEX IF NOT EXISTS idx_job_partition_id_and_dependent_job_id ON public.job_dependency USING btree (partition_id, dependent_job_id); + +CREATE TYPE EXPIRATION_OPERATION AS ENUM('Expire', 'Delete'); + +CREATE TYPE EXPIRATION_POLICER AS ENUM('User', 'System'); + +ALTER TYPE JOB_STATUS ADD VALUE 'Expired'; + +CREATE TYPE BASIC_POLICY AS ( + job_status JOB_STATUS, + operation EXPIRATION_OPERATION, + expiration_time VARCHAR(58) +); + +CREATE TYPE JOB_POLICY AS ( + partition_id VARCHAR(40), + job_id VARCHAR(48), + b_policy BASIC_POLICY +); + +CREATE TABLE job_expiration_policy( + partition_id VARCHAR(40) NOT NULL, + job_id VARCHAR(48) NOT NULL, + job_status JOB_STATUS NOT NULL, + operation EXPIRATION_OPERATION NOT NULL, + expiration_time VARCHAR(58) NOT NULL, + exact_expiry_time TIMESTAMP, + last_modified_offset INTERVAL, + CONSTRAINT pk_job_expiration_policy PRIMARY KEY (partition_id, job_id, job_status), + CONSTRAINT fk_expiration_policy_job FOREIGN KEY (partition_id, job_id) + REFERENCES job (partition_id, job_id), + CONSTRAINT exact_expiry_time CHECK( + (exact_expiry_time IS NOT NULL AND last_modified_offset IS NULL) + OR + (last_modified_offset IS NOT NULL AND exact_expiry_time IS NULL) + ) +); + +CREATE TABLE default_job_expiration_policy( + job_status JOB_STATUS NOT NULL, + operation EXPIRATION_OPERATION NOT NULL, + expiration_time VARCHAR(58) NOT NULL, + create_date_offset VARCHAR(12), + last_modified_offset INTERVAL, + CONSTRAINT create_date_offset CHECK( + (create_date_offset IS NOT NULL AND last_modified_offset IS NULL) + OR + (last_modified_offset IS NOT NULL AND create_date_offset IS NULL) + ) +); From d6b32aa456fe368c13d65843bbf615134a760a20 Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Tue, 22 Mar 2022 12:14:41 +0000 Subject: [PATCH 03/11] upgrade to postgres 12 --- job-service-acceptance-tests/pom.xml | 2 +- job-service-container/pom.xml | 2 +- job-service-postgres-container/README.md | 2 +- job-service-postgres-container/pom.xml | 2 +- worker-jobtracking-container/pom.xml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/job-service-acceptance-tests/pom.xml b/job-service-acceptance-tests/pom.xml index 68edb5d75..7b454d236 100644 --- a/job-service-acceptance-tests/pom.xml +++ b/job-service-acceptance-tests/pom.xml @@ -412,7 +412,7 @@ ${job.database.host} - ${dockerHubPublic}/library/postgres:11 + ${dockerHubPublic}/library/postgres:12 ${postgres.db.port}:5432 diff --git a/job-service-container/pom.xml b/job-service-container/pom.xml index 8cb9ae539..b3c7e0410 100644 --- a/job-service-container/pom.xml +++ b/job-service-container/pom.xml @@ -328,7 +328,7 @@ jobservice-integrationtests-postgres - ${dockerHubPublic}/library/postgres:11 + ${dockerHubPublic}/library/postgres:12 ${postgres.db.port}:5432 diff --git a/job-service-postgres-container/README.md b/job-service-postgres-container/README.md index d0c1edf1c..afd897f34 100644 --- a/job-service-postgres-container/README.md +++ b/job-service-postgres-container/README.md @@ -6,7 +6,7 @@ A Docker container encapsulating the job service database. This container has be ## Service Mode -By default, the container runs the job service database as a service. On start up, a database (named 'jobservice' by default) is installed on a postgres 11 instance inside the container. The postgres instance will not be available for connection until the database set up is complete. The completion can be seen when the container log outputs the line "Completed installation of Job Service database.". +By default, the container runs the job service database as a service. On start up, a database (named 'jobservice' by default) is installed on a postgres 12 instance inside the container. The postgres instance will not be available for connection until the database set up is complete. The completion can be seen when the container log outputs the line "Completed installation of Job Service database.". ### Environment Variables diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index 94c269004..a2c6888e6 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -96,7 +96,7 @@ ${js.database.host} - ${dockerHubPublic}/library/postgres:11 + ${dockerHubPublic}/library/postgres:12 ${postgres.db.port}:5432 diff --git a/worker-jobtracking-container/pom.xml b/worker-jobtracking-container/pom.xml index 3b98ae4d9..eccc4a192 100644 --- a/worker-jobtracking-container/pom.xml +++ b/worker-jobtracking-container/pom.xml @@ -333,7 +333,7 @@ ${job.database.host} - ${dockerHubPublic}/library/postgres:11 + ${dockerHubPublic}/library/postgres:12 ${postgres.db.port}:5432 From 7f75cfd2d00a237280cdd7a1f2ae2718e013a974 Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Tue, 22 Mar 2022 12:14:54 +0000 Subject: [PATCH 04/11] Update contract --- .../com/hpe/caf/services/job/swagger.yaml | 78 ++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/job-service-contract/src/main/resources/com/hpe/caf/services/job/swagger.yaml b/job-service-contract/src/main/resources/com/hpe/caf/services/job/swagger.yaml index 883406ebe..9019d831b 100644 --- a/job-service-contract/src/main/resources/com/hpe/caf/services/job/swagger.yaml +++ b/job-service-contract/src/main/resources/com/hpe/caf/services/job/swagger.yaml @@ -71,7 +71,7 @@ paths: in: query type: string required: false - description: All - no status filter is applied (Default); NotCompleted - only those results with statuses other than Completed will be returned; Completed - only those results with Completed status will be returned; Inactive - only those results with inactive statuses (i.e. Completed, Failed, Cancelled) will be returned; NotFinished - only those results with unfinished statuses (ie. Active, Paused, Waiting) will be returned. + description: All - no status filter is applied (Default); NotCompleted - only those results with statuses other than Completed will be returned; Completed - only those results with Completed status will be returned; Inactive - only those results with inactive statuses (i.e. Completed, Failed, Cancelled, Expired) will be returned; NotFinished - only those results with unfinished statuses (ie. Active, Paused, Waiting) will be returned. enum: - All - NotCompleted @@ -121,6 +121,20 @@ paths: $ref: "#/definitions/job" 503: description: The request failed due to a database connection error. + /partitions/default-expiry: + get: + tags: + - Jobs + summary: Gets the default expiry + description: Returns the default expiry settings. + operationId: getDefaultExpiry + responses: + 200: + description: Returns the default expiry settings. + schema: + $ref: "#/definitions/expirationPolicy" + 503: + description: The request failed due to a database connection error. /partitions/{partitionId}/jobs/{jobId}: parameters: - $ref: '#/parameters/partitionId' @@ -377,6 +391,8 @@ definitions: description: type: string description: The description of the job + expiry: + $ref: "#/definitions/expirationPolicy" externalData: # TODO: Should we make this more structured than just string # Name/Value Pairs perhaps @@ -414,6 +430,66 @@ definitions: example: "tag:4": "4" owner: "bob" + expirationPolicy: + type: object + properties: + "Active": + $ref: "#/definitions/expirablePolicy" + "Completed": + $ref: "#/definitions/deletePolicy" + "Failed": + $ref: "#/definitions/deletePolicy" + "Cancelled": + $ref: "#/definitions/deletePolicy" + "Waiting": + $ref: "#/definitions/expirablePolicy" + "Paused": + $ref: "#/definitions/expirablePolicy" + "Expired": + $ref: "#/definitions/deletePolicy" + description: The expiration policy to be applied on the job + deletePolicy: + type: object + properties: + expiryTime: + type: string + pattern: '^(?\d{4})-(?0[1-9]|1[0-2])-(?0[1-9]|[12][0-9]|3[01])T(?[01][0-9]|2[0-3]):(?[0-5][0-9]):(?[0-5][0-9]|60)(?\.[0-9]+)?(Z|(\+|-)(?[01][0-9]|2[0-3]):(?[0-5][0-9]))$|^(lastUpdateTime|createTime)\+((P)\d+[DYM])$|^(lastUpdateTime|createTime)\+(PT)?(\d+[HMS])$|^none$' + example: 2021-04-12T23:20:50.52Z, lastUpdateTime+P1D, createTime+P90M, createTime+PT9H, none + description: The delay before expiration + operation: + type: string + enum: + - Delete + default: Delete + description: The action to apply on expired jobs + policer: + $ref: "#/definitions/policer" + description: The expiration details to be applied on the job + expirablePolicy: + type: object + properties: + expiryTime: + type: string + pattern: '^(?\d{4})-(?0[1-9]|1[0-2])-(?0[1-9]|[12][0-9]|3[01])T(?[01][0-9]|2[0-3]):(?[0-5][0-9]):(?[0-5][0-9]|60)(?\.[0-9]+)?(Z|(\+|-)(?[01][0-9]|2[0-3]):(?[0-5][0-9]))$|^(lastUpdateTime|createTime)\+((P)\d+[DYM])$|^(lastUpdateTime|createTime)\+(PT)?(\d+[HMS])$|^none$' + example: 2021-04-12T23:20:50.52Z, lastUpdateTime+P1D, createTime+P90M, createTime+PT9H, none + description: The delay before expiration + operation: + type: string + enum: + - Expire + - Delete + default: Expire + description: The action to apply on expired jobs + policer: + $ref: "#/definitions/policer" + description: The expiration details to be applied on the job + policer: + type: string + enum: + - User + - System + default: User + description: The instance defining the policy worker-action: type: object required: From 1a0399a97daa4c641dca76d7acd5a2457d53e674 Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Tue, 22 Mar 2022 16:45:52 +0000 Subject: [PATCH 05/11] merging with master --- docs/pages/en-us/Architecture.md | 37 +++- .../services/job/api/JobServiceFilterIT.java | 1 + .../migration/V4__drop_unused_functions.sql | 58 ++++++ .../db/migration/V5__forward_declarations.sql | 39 ++++ .../db/migration/V6__insertPolicies.sql | 37 ++++ .../internal/R__internal__createJob.sql | 5 +- .../internal/R__internal__expireJob.sql | 59 ++++++ .../internal/R__internal__getTaskStatus.sql | 9 +- .../internal/R__internal__resolveStatus.sql | 7 +- .../internal/R__internal__upsertJobPolicy.sql | 90 +++++++++ .../functions/public/R__cancelJob.sql | 2 +- .../functions/public/R__createJob.sql | 5 +- .../public/R__createJobForDependencies.sql | 5 +- .../functions/public/R__deleteJob.sql | 3 + .../functions/public/R__deleteOrExpireJob.sql | 42 ++++ .../public/R__getDefaultSystemExpiry.sql | 43 ++++ .../migration/functions/public/R__getJob.sql | 27 ++- .../migration/functions/public/R__getJobs.sql | 36 +++- .../public/R__applyJobExpirationPolicy.sql | 99 +++++++++ .../public/R__refreshDefaultExpiry.sql | 60 ++++++ job-service-postgres-container/pom.xml | 4 +- .../ApplyJobExpirationPolicyTask.java | 54 +++++ .../scheduled/executor/ScheduledExecutor.java | 5 + .../executor/ScheduledExecutorConfig.java | 9 + job-service/pom.xml | 5 + .../caf/services/job/api/DatabaseHelper.java | 191 +++++++++++++----- .../caf/services/job/api/JobsGetExpiry.java | 59 ++++++ .../com/hpe/caf/services/job/api/JobsPut.java | 8 +- .../services/job/api/generated/JobsApi.java | 20 +- .../job/api/generated/JobsApiService.java | 1 + .../job/api/generated/JobsApiServiceImpl.java | 13 ++ .../job/api/generated/model/DeletePolicy.java | 115 +++++++++++ .../api/generated/model/ExpirablePolicy.java | 141 +++++++++++++ .../api/generated/model/ExpirationPolicy.java | 151 ++++++++++++++ .../services/job/api/generated/model/Job.java | 16 +- .../job/api/generated/model/NewJob.java | 13 +- .../job/api/generated/model/Policer.java | 24 +++ .../services/job/utilities/DateHelper.java | 67 ++++++ .../job/utilities/ExpirationPolicyHelper.java | 81 ++++++++ .../services/job/utilities/PolicyBuilder.java | 82 ++++++++ .../hpe/caf/services/job/api/JobsPutTest.java | 38 +++- .../job/utilities/DateHelperTest.java | 53 +++++ release-notes-6.0.0.md | 3 + 43 files changed, 1732 insertions(+), 85 deletions(-) create mode 100644 job-service-db/src/main/resources/db/migration/V6__insertPolicies.sql create mode 100644 job-service-db/src/main/resources/db/migration/functions/internal/R__internal__expireJob.sql create mode 100644 job-service-db/src/main/resources/db/migration/functions/internal/R__internal__upsertJobPolicy.sql create mode 100644 job-service-db/src/main/resources/db/migration/functions/public/R__deleteOrExpireJob.sql create mode 100644 job-service-db/src/main/resources/db/migration/functions/public/R__getDefaultSystemExpiry.sql create mode 100644 job-service-db/src/main/resources/db/migration/procedures/public/R__applyJobExpirationPolicy.sql create mode 100644 job-service-db/src/main/resources/db/migration/procedures/public/R__refreshDefaultExpiry.sql create mode 100644 job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ApplyJobExpirationPolicyTask.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/api/JobsGetExpiry.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/DeletePolicy.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirablePolicy.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirationPolicy.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Policer.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/utilities/DateHelper.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/utilities/ExpirationPolicyHelper.java create mode 100644 job-service/src/main/java/com/hpe/caf/services/job/utilities/PolicyBuilder.java create mode 100644 job-service/src/test/java/com/hpe/caf/services/job/utilities/DateHelperTest.java diff --git a/docs/pages/en-us/Architecture.md b/docs/pages/en-us/Architecture.md index 132e67208..bbe101761 100644 --- a/docs/pages/en-us/Architecture.md +++ b/docs/pages/en-us/Architecture.md @@ -150,7 +150,7 @@ This table stores Ids of dependent jobs i.e. jobs which must be completed before ### Job Task Data -This table stores information on jobs which have dependent jobs and must wait for execution. The table contains enough information for the Job Tracking Worker and Job Service Scheduled Executor to forward on the job, once it's dependent jobs have all completed. +This table stores information on jobs which have dependent jobs and must wait for execution. The table contains enough information for the Job Tracking Worker and Job Service Scheduled Executor to forward on the job, once its dependent jobs have all completed. | **Column** | **Data Type** | **Nullable?** | **Primary Key?** | |----------------------|---------------|---------------|------------------| @@ -163,3 +163,38 @@ This table stores information on jobs which have dependent jobs and must wait fo | target_pipe | varchar(255) | No | | | eligible_to_run_date | timestamp | Yes | | +### Job Expiration Policy + +This table stores information about the expiration policy related to the jobs. The Job Service Scheduled Executor runs +at regular interval in order to apply those policies. + +| **Column** | **Data Type** | **Nullable?** | **Primary Key?** | +|----------------------|---------------------|---------------|------------------| +| partition_id | varchar(40) | No | Yes | +| job_id | varchar(48) | No | Yes | +| job_status | job_status | No | Yes | +| operation | expiration_operation| No | | +| expiration_time | varchar(58) | No | | +| exact_expiry_time | timestamp | Yes | | +| last_modified_offset | interval | Yes | | + +### Default Job Expiration Policy + +This table stores the default expiration_policy related to the jobs. Its role is to provide a policy whenever missing +from the job_expiration_policy table + +| **Column** | **Data Type** | **Nullable?** | **Primary Key?** | +|----------------------|---------------------|---------------|------------------| +| job_status | job_status | No | Yes | +| operation | expiration_operation| No | | +| expiration_time | varchar(58) | No | | +| create_date_offset | varchar(12) | Yes | | +| last_modified_offset | interval | Yes | | + +### Enumeration Types + +| **Enumerated Type** | **Enum labels** | +|----------------------|----------------------------------------------------------------| +| expiration_operation | Expire, Delete | +| job_status | Active, Cancelled, Completed, Expired, Failed, Paused, Waiting | +| expiration_policer | User, System | diff --git a/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java b/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java index 709d89637..65783d7d2 100644 --- a/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java +++ b/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java @@ -120,6 +120,7 @@ public void testCompundFiltering() throws Exception final List jobs = jobsApi.getJobs( defaultPartitionId, correlationId, null, null, null, null, null, null, "labels.label1==value or labels.label3==value"); + System.out.println("poloko " + jobs.size()); assertTrue(jobs.size() == 2); cleanUpJobs(jobId1, jobId2, jobId3); } diff --git a/job-service-db/src/main/resources/db/migration/V4__drop_unused_functions.sql b/job-service-db/src/main/resources/db/migration/V4__drop_unused_functions.sql index 3a97ee6e6..b664c118e 100644 --- a/job-service-db/src/main/resources/db/migration/V4__drop_unused_functions.sql +++ b/job-service-db/src/main/resources/db/migration/V4__drop_unused_functions.sql @@ -57,6 +57,38 @@ DROP FUNCTION IF EXISTS create_job( in_prerequisite_job_ids VARCHAR(128)[], in_delay INT ); +DROP FUNCTION IF EXISTS create_job( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48), + in_name VARCHAR(255), + in_description TEXT, + in_data TEXT, + in_job_hash INT, + in_task_classifier VARCHAR(255), + in_task_api_version INT, + in_task_data BYTEA, + in_task_pipe VARCHAR(255), + in_target_pipe VARCHAR(255), + in_delay INT, + in_labels VARCHAR(255)[][] +); +DROP FUNCTION IF EXISTS create_job( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48), + in_name VARCHAR(255), + in_description TEXT, + in_data TEXT, + in_job_hash INT, + in_task_classifier VARCHAR(255), + in_task_api_version INT, + in_task_data BYTEA, + in_task_pipe VARCHAR(255), + in_target_pipe VARCHAR(255), + in_prerequisite_job_ids VARCHAR(128)[], + in_delay INT, + in_labels VARCHAR(255)[][], + in_suspended_partition BOOLEAN +); DROP FUNCTION IF EXISTS delete_job( in_partition_id VARCHAR(40), in_job_id VARCHAR(48), @@ -79,6 +111,10 @@ DROP FUNCTION IF EXISTS get_job( in_partition_id VARCHAR(40), in_job_id VARCHAR(58) ); +DROP FUNCTION IF EXISTS get_job( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48) +); DROP FUNCTION IF EXISTS get_job_can_be_progressed( in_partition_id VARCHAR(40), in_job_id VARCHAR(48) @@ -110,6 +146,18 @@ DROP FUNCTION IF EXISTS get_jobs( in_sort_ascending BOOLEAN, in_labels VARCHAR(255)[], in_filter VARCHAR(255)); +DROP FUNCTION IF EXISTS get_jobs( + in_partition_id VARCHAR(40), + in_job_id_starts_with VARCHAR(48), + in_status_type VARCHAR(20), + in_limit INT, + in_offset INT, + in_sort_field VARCHAR(20), + in_sort_label VARCHAR(255), + in_sort_ascending BOOLEAN, + in_labels VARCHAR(255)[], + in_filter VARCHAR(255) +); DROP FUNCTION IF EXISTS get_jobs_count( in_partition_id VARCHAR(40), in_job_id_starts_with VARCHAR(48), @@ -227,3 +275,13 @@ DROP FUNCTION IF EXISTS internal_upsert_into_task_table( in_percentage_complete DOUBLE PRECISION, in_failure_details TEXT ); +DROP FUNCTION IF EXISTS internal_create_job( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48), + in_name VARCHAR(255), + in_description TEXT, + in_data TEXT, + in_delay INT, + in_job_hash INT, + in_labels VARCHAR(255)[][] +); diff --git a/job-service-db/src/main/resources/db/migration/V5__forward_declarations.sql b/job-service-db/src/main/resources/db/migration/V5__forward_declarations.sql index 5e3bb598d..823b000ee 100644 --- a/job-service-db/src/main/resources/db/migration/V5__forward_declarations.sql +++ b/job-service-db/src/main/resources/db/migration/V5__forward_declarations.sql @@ -324,3 +324,42 @@ BEGIN AS 'BEGIN /* Forward Declaration */ END'; EXCEPTION WHEN duplicate_function THEN END $$; + +DO $$ +BEGIN + CREATE OR REPLACE FUNCTION internal_expire_job( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48) + ) + RETURNS VOID + LANGUAGE plpgsql + AS 'BEGIN /* Forward Declaration */ END'; +EXCEPTION WHEN duplicate_function THEN +END $$; + +DO $$ +BEGIN + CREATE OR REPLACE FUNCTION internal_upsert_job_policy( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48), + in_policies JOB_POLICY[] + ) + RETURNS VOID + LANGUAGE plpgsql +AS 'BEGIN /* Forward Declaration */ END'; +EXCEPTION WHEN duplicate_function THEN +END $$; +DO $$ +BEGIN + CREATE OR REPLACE FUNCTION internal_create_job( + in_data TEXT, + in_delay INT, + in_job_hash INT, + in_labels VARCHAR(255)[][] default null, + in_policies job_policy[] default null + ) + RETURNS BOOLEAN + LANGUAGE plpgsql +AS 'BEGIN /* Forward Declaration */ END'; +EXCEPTION WHEN duplicate_function THEN +END $$; diff --git a/job-service-db/src/main/resources/db/migration/V6__insertPolicies.sql b/job-service-db/src/main/resources/db/migration/V6__insertPolicies.sql new file mode 100644 index 000000000..b0e486863 --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/V6__insertPolicies.sql @@ -0,0 +1,37 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Failed', 'Delete', 'infinity'::timestamp, '', null); + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Cancelled', 'Delete', 'infinity'::timestamp, '', null); + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Expired', 'Delete', 'infinity'::timestamp, '', null); + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Paused', 'Delete', 'infinity'::timestamp, '', null); + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Active', 'Delete', 'infinity'::timestamp, '', null); + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Waiting', 'Delete', 'infinity'::timestamp, '', null); + +INSERT INTO default_job_expiration_policy(job_status, operation, expiration_time, create_date_offset, last_modified_offset) +VALUES ('Completed', 'Delete', 'infinity'::timestamp, '', null); diff --git a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__createJob.sql b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__createJob.sql index eb2335160..f18dafa40 100644 --- a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__createJob.sql +++ b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__createJob.sql @@ -29,7 +29,8 @@ CREATE OR REPLACE FUNCTION internal_create_job( in_data TEXT, in_delay INT, in_job_hash INT, - in_labels VARCHAR(255)[][] default null + in_labels VARCHAR(255)[][] default null, + in_policies job_policy[] default null ) RETURNS BOOLEAN LANGUAGE plpgsql @@ -78,6 +79,8 @@ BEGIN END LOOP; END IF; + PERFORM internal_upsert_job_policy(in_partition_id, in_job_id, in_policies); + RETURN TRUE; EXCEPTION WHEN unique_violation THEN diff --git a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__expireJob.sql b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__expireJob.sql new file mode 100644 index 000000000..55f66095e --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__expireJob.sql @@ -0,0 +1,59 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + * Name: internal_expire_job + * + * Description: + * Expires the specified job. + */ +CREATE OR REPLACE FUNCTION internal_expire_job( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48) +) + RETURNS VOID + LANGUAGE plpgsql VOLATILE +AS $$ +DECLARE + v_is_finished BOOLEAN; + +BEGIN + -- Only support Expire operation on jobs with current status 'Waiting', 'Active' or 'Paused' + -- And take out an exclusive update lock on the job row + SELECT status IN ('Failed', 'Cancelled', 'Completed') INTO v_is_finished + FROM job + WHERE partition_id = in_partition_id + AND job_id = in_job_id + FOR UPDATE; + + IF NOT FOUND OR v_is_finished THEN + RETURN; + END IF; + + -- Mark the job cancelled in the job table + UPDATE job + SET status = 'Expired', last_update_date = now() AT TIME ZONE 'UTC' + WHERE partition_id = in_partition_id + AND job_id = in_job_id + AND status != 'Expired'; + + -- Drop any task tables relating to the job + PERFORM internal_drop_task_tables(in_partition_id, in_job_id); + + -- Removes all related subtasks from completed_subtask_report table + PERFORM internal_cleanup_completed_subtask_report(in_partition_id, in_job_id); +END +$$; diff --git a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__getTaskStatus.sql b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__getTaskStatus.sql index 91422c59b..d26cb503c 100644 --- a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__getTaskStatus.sql +++ b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__getTaskStatus.sql @@ -59,10 +59,11 @@ BEGIN CASE status WHEN 'Failed' THEN 1 WHEN 'Cancelled' THEN 2 - WHEN 'Paused' THEN 3 - WHEN 'Active' THEN 4 - WHEN 'Waiting' THEN 5 - WHEN 'Completed' THEN 6 + WHEN 'Expired' THEN 3 + WHEN 'Paused' THEN 4 + WHEN 'Active' THEN 5 + WHEN 'Waiting' THEN 6 + WHEN 'Completed' THEN 7 END AS importance FROM ( diff --git a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__resolveStatus.sql b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__resolveStatus.sql index 898c3f01d..2b0d9cc39 100644 --- a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__resolveStatus.sql +++ b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__resolveStatus.sql @@ -32,9 +32,10 @@ AS $$ SELECT 1, CAST('Waiting' AS job_status) UNION ALL SELECT 2, 'Active' UNION ALL SELECT 3, 'Paused' UNION ALL - SELECT 4, 'Failed' UNION ALL - SELECT 5, 'Cancelled' UNION ALL - SELECT 6, 'Completed' + SELECT 4, 'Expired' UNION ALL + SELECT 5, 'Failed' UNION ALL + SELECT 6, 'Cancelled' UNION ALL + SELECT 7, 'Completed' ) SELECT CASE WHEN p1.priority > p2.priority THEN p1.status ELSE p2.status END FROM priority_tbl p1 diff --git a/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__upsertJobPolicy.sql b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__upsertJobPolicy.sql new file mode 100644 index 000000000..c0c27c898 --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/functions/internal/R__internal__upsertJobPolicy.sql @@ -0,0 +1,90 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + * Name: internal_upsert_job_policy + * + * Description: + * Insert the expiration_policy of a job into the job_expiration_policy table. + */ +CREATE OR REPLACE FUNCTION internal_upsert_job_policy( + in_partition_id VARCHAR(40), + in_job_id VARCHAR(48), + in_policies JOB_POLICY[] +) + RETURNS VOID + LANGUAGE plpgsql VOLATILE +AS $$ +DECLARE + v_created_time TIMESTAMP; + +BEGIN + + IF in_policies IS NULL + OR CARDINALITY(in_policies) = 0 THEN RETURN; + END IF; + + -- Storing job's create_date + SELECT + create_date + FROM + job + WHERE + partition_id = in_partition_id + AND job_id = in_job_id + INTO + v_created_time; + + INSERT + INTO + job_expiration_policy ( + partition_id, + job_id, + job_status, + operation, + expiration_time, + exact_expiry_time, + last_modified_offset + ) + SELECT + in_partition_id, + in_job_id, + (p.b_policy).job_status, + (p.b_policy).operation, + (p.b_policy).expiration_time AS exp_time, + CASE + -- if expiration_time starts with createTime, we calculate and store the exact expiration time + WHEN LEFT((p.b_policy).expiration_time, 1) = 'c' THEN ( + v_created_time + split_part((p.b_policy).expiration_time, '+', 2)::INTERVAL + )::timestamp + -- if expiration_time equals 'none', we set expiration_time to infinity + WHEN LEFT((p.b_policy).expiration_time, 1) = 'n' THEN + 'infinity'::timestamp + -- if expiration_time starts with lastUpdateTime, we set null + WHEN LEFT((p.b_policy).expiration_time, 1) = 'l' THEN + NULL + ELSE + -- otherwise, we cast the date provided and store it + (p.b_policy).expiration_time::timestamp + END AS exact_expiry_time, + CASE + WHEN LEFT((p.b_policy).expiration_time, 1) = 'l' THEN split_part((p.b_policy).expiration_time, '+', 2)::INTERVAL + END AS last_modified_offset + FROM + UNNEST(in_policies) AS p; + + RETURN; +END $$; diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__cancelJob.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__cancelJob.sql index 3926bac86..c92a19375 100644 --- a/job-service-db/src/main/resources/db/migration/functions/public/R__cancelJob.sql +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__cancelJob.sql @@ -38,7 +38,7 @@ BEGIN -- Only support Cancel operation on jobs with current status 'Waiting', 'Active' or 'Paused' -- And take out an exclusive update lock on the job row - SELECT status IN ('Completed', 'Failed') INTO v_is_finished + SELECT status IN ('Expired', 'Completed', 'Failed') INTO v_is_finished FROM job WHERE partition_id = in_partition_id AND job_id = in_job_id diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__createJob.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__createJob.sql index c6d5e2888..ec289f4f6 100644 --- a/job-service-db/src/main/resources/db/migration/functions/public/R__createJob.sql +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__createJob.sql @@ -34,7 +34,8 @@ CREATE OR REPLACE FUNCTION create_job( in_task_pipe VARCHAR(255), in_target_pipe VARCHAR(255), in_delay INT, - in_labels VARCHAR(255)[][] default null + in_labels VARCHAR(255)[][] default null, + in_policies job_policy[] default null ) RETURNS TABLE( job_created BOOLEAN @@ -77,7 +78,7 @@ BEGIN in_delay = 0; END IF; - IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN + IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels, in_policies) THEN RETURN QUERY SELECT FALSE; RETURN; END IF; diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__createJobForDependencies.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__createJobForDependencies.sql index e220fe43e..431b5be41 100644 --- a/job-service-db/src/main/resources/db/migration/functions/public/R__createJobForDependencies.sql +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__createJobForDependencies.sql @@ -38,7 +38,8 @@ CREATE OR REPLACE FUNCTION create_job( in_prerequisite_job_ids VARCHAR(128)[], in_delay INT, in_labels VARCHAR(255)[][] default null, - in_suspended_partition BOOLEAN default false + in_suspended_partition BOOLEAN default false, + in_policies job_policy[] default null ) RETURNS TABLE( job_created BOOLEAN @@ -81,7 +82,7 @@ BEGIN in_delay = 0; END IF; - IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels) THEN + IF NOT internal_create_job(in_partition_id, in_job_id, in_name, in_description, in_data, in_delay, in_job_hash, in_labels, in_policies) THEN RETURN QUERY SELECT FALSE; RETURN; END IF; diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__deleteJob.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__deleteJob.sql index 088757823..c8b811385 100644 --- a/job-service-db/src/main/resources/db/migration/functions/public/R__deleteJob.sql +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__deleteJob.sql @@ -59,6 +59,9 @@ BEGIN -- Remove any associated labels DELETE FROM label lbl WHERE lbl.partition_id = in_partition_id AND lbl.job_id = in_job_id; + -- Remove any associated policy + DELETE FROM job_expiration_policy jep WHERE jep.partition_id = in_partition_id AND jep.job_id = in_job_id; + -- Remove row from the job table DELETE FROM job WHERE partition_id = in_partition_id diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__deleteOrExpireJob.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__deleteOrExpireJob.sql new file mode 100644 index 000000000..311d58573 --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__deleteOrExpireJob.sql @@ -0,0 +1,42 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + * Name: delete_or_expire_job + * + * Description: + * Expires the specified job. + */ +CREATE OR REPLACE FUNCTION delete_or_expire_job( + in_partition_id VARCHAR, + in_job_id VARCHAR, + in_operation EXPIRATION_OPERATION +) + RETURNS VOID + LANGUAGE plpgsql VOLATILE +AS +$$ +BEGIN + IF in_operation = 'Expire' THEN + PERFORM internal_expire_job(in_partition_id, in_job_id); + ELSE + PERFORM delete_job(in_partition_id, in_job_id); + END IF; + + PERFORM internal_process_failed_dependent_jobs(in_partition_id, in_job_id, 'Failure due to an expired job dependency'); + +END; +$$ diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__getDefaultSystemExpiry.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__getDefaultSystemExpiry.sql new file mode 100644 index 000000000..a5de89300 --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__getDefaultSystemExpiry.sql @@ -0,0 +1,43 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + * Name: get_default_expiry + * + * Description: + * Returns the default expiry. + */ +CREATE OR REPLACE FUNCTION get_default_expiry( +) + RETURNS TABLE( + expiration_status job_status, + operation expiration_operation, + expiration_time VARCHAR(58), + policer expiration_policer + ) + LANGUAGE plpgsql IMMUTABLE +AS $$ +BEGIN + + RETURN QUERY + SELECT djep.job_status, + djep.operation, + djep.expiration_time, + 'System'::EXPIRATION_POLICER + FROM default_job_expiration_policy djep; + +END +$$; diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__getJob.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__getJob.sql index f7b03d2c8..b4df4602a 100644 --- a/job-service-db/src/main/resources/db/migration/functions/public/R__getJob.sql +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__getJob.sql @@ -36,7 +36,11 @@ RETURNS TABLE( failure_details TEXT, actionType CHAR(6), label VARCHAR(255), - label_value VARCHAR(255) + label_value VARCHAR(255), + expiration_status job_status, + operation expiration_operation, + expiration_time VARCHAR(58), + policer expiration_policer ) LANGUAGE plpgsql VOLATILE AS $$ @@ -70,9 +74,26 @@ BEGIN job.failure_details, CAST('WORKER' AS CHAR(6)) AS actionType, lbl.label, - lbl.value + lbl.value, + djep.job_status, + COALESCE (jep.operation, djep.operation) AS operation, + COALESCE (jep.expiration_time, djep.expiration_time ) AS expiration_time, + CASE + WHEN jep.operation IS NULL THEN + 'System'::EXPIRATION_POLICER + ELSE + 'User'::EXPIRATION_POLICER + END FROM job - LEFT JOIN public.label lbl ON lbl.partition_id = job.partition_id AND lbl.job_id = job.job_id + CROSS JOIN default_job_expiration_policy djep + LEFT JOIN public.label lbl + ON lbl.partition_id = job.partition_id + AND lbl.job_id = job.job_id + + LEFT JOIN job_expiration_policy jep + ON jep.partition_id = job.partition_id + AND jep.job_id = job.job_id + AND djep.job_status = jep.job_status WHERE job.partition_id = in_partition_id AND job.job_id = in_job_id; diff --git a/job-service-db/src/main/resources/db/migration/functions/public/R__getJobs.sql b/job-service-db/src/main/resources/db/migration/functions/public/R__getJobs.sql index 55e378a31..c0c276767 100644 --- a/job-service-db/src/main/resources/db/migration/functions/public/R__getJobs.sql +++ b/job-service-db/src/main/resources/db/migration/functions/public/R__getJobs.sql @@ -47,7 +47,11 @@ RETURNS TABLE( failure_details TEXT, actionType CHAR(6), label VARCHAR(255), - label_value VARCHAR(255) + label_value VARCHAR(255), + expiration_status job_status, + operation expiration_operation, + expiration_time VARCHAR(58), + policer expiration_policer ) LANGUAGE plpgsql VOLATILE AS $$ @@ -82,7 +86,16 @@ BEGIN job.failure_details, CAST('WORKER' AS CHAR(6)) AS actionType, lbl.label, - lbl.value + lbl.value, + djep.job_status, + COALESCE (jep.operation, djep.operation) AS operation, + COALESCE (jep.expiration_time, djep.expiration_time ) AS expiration_time, + CASE + WHEN jep.operation IS NULL THEN + 'System'::EXPIRATION_POLICER + ELSE + 'User'::EXPIRATION_POLICER + END AS policer FROM (SELECT job.partition_id, @@ -119,13 +132,13 @@ BEGIN IF in_status_type IS NOT NULL THEN IF in_status_type = 'NotCompleted' THEN - sql := sql || whereOrAnd || $q$ status IN ('Active', 'Paused', 'Waiting', 'Cancelled', 'Failed')$q$; + sql := sql || whereOrAnd || $q$ status IN ('Active', 'Paused', 'Waiting', 'Cancelled', 'Failed', 'Expired')$q$; whereOrAnd := andConst; ELSIF in_status_type = 'Completed' THEN sql := sql || whereOrAnd || $q$ status IN ('Completed')$q$; whereOrAnd := andConst; ELSIF in_status_type = 'Inactive' THEN - sql := sql || whereOrAnd || $q$ status IN ('Completed', 'Cancelled', 'Failed')$q$; + sql := sql || whereOrAnd || $q$ status IN ('Completed', 'Cancelled', 'Failed', 'Expired')$q$; whereOrAnd := andConst; ELSIF in_status_type = 'NotFinished' THEN sql := sql || whereOrAnd || $q$ status IN ('Active', 'Paused', 'Waiting')$q$; @@ -155,8 +168,15 @@ BEGIN sql := sql || ' OFFSET ' || in_offset; END IF; -- Join onto the labels after paging to avoid them bloating the row count - sql := sql || ' ) as job LEFT JOIN public.label lbl ON lbl.partition_id = job.partition_id ' + sql := sql || ' ) as job ' || + 'CROSS JOIN default_job_expiration_policy djep + LEFT JOIN public.label lbl ON lbl.partition_id = job.partition_id ' || 'AND lbl.job_id = job.job_id'; + -- Join onto the job_expiration_policy table + sql := sql || ' LEFT JOIN job_expiration_policy jep + ON jep.partition_id = job.partition_id + AND jep.job_id = job.job_id + AND djep.job_status = jep.job_status'; sql := sql || ' ORDER BY ' || CASE WHEN in_sort_label IS NOT NULL AND in_sort_label != '' THEN '(SELECT value FROM label l WHERE job.partition_id = l.partition_id AND job.job_id = l.job_id AND l.label = ' || @@ -207,7 +227,11 @@ BEGIN at.failure_details, CAST('WORKER' AS CHAR(6)) AS actionType, at.label, - at.value + at.value, + at.job_status, + at.operation, + at.expiration_time, + at.policer FROM get_job_temp at ORDER BY at.id; END diff --git a/job-service-db/src/main/resources/db/migration/procedures/public/R__applyJobExpirationPolicy.sql b/job-service-db/src/main/resources/db/migration/procedures/public/R__applyJobExpirationPolicy.sql new file mode 100644 index 000000000..b7c3e593e --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/procedures/public/R__applyJobExpirationPolicy.sql @@ -0,0 +1,99 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + * Name: apply_job_expiration_policy + * + * Description: + * Expires or deletes jobs in accordance with their expiry policies. + * Policies can be found in 2 tables: + * - job_expiration_policy if the user provided it on job's creation ( alias user_p) + * - default_job_expiration_policy ( alias default_p) + * Each of those tables have two columns, respectively related, to create_date and last_update_date such as: + * - default_job_expiration_policy -> create_date_offset(varchar), last_modified_offset(interval) + * - job_expiration_policy -> exact_expiry_time(timestamp), last_modified_offset(interval) + * Only 1 of those fields maximum, by table, can be filled (the other one should be null) + * + * User's policy is used in priority if provided, default is applied otherwise + * + */ +CREATE OR REPLACE PROCEDURE apply_job_expiration_policy( +) + LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM NULL + FROM ( + SELECT partition_id, + job_id, + COALESCE(user_p.operation, default_p.operation) AS operation + FROM job j + CROSS JOIN LATERAL ( + -- Getting the latest status + SELECT status AS current_status + FROM + get_job( + partition_id, + job_id + ) + LIMIT 1 + ) latest_status + CROSS JOIN default_job_expiration_policy default_p -- default expiration policy table + LEFT JOIN job_expiration_policy user_p -- user defined expiration policy table + USING ( + partition_id, + job_id, + job_status + ) + WHERE job_status = status + -- Order: + -- user_p.exact_expiry_time (1) + -- user_p.last_modified_offset (2) + -- default_p.last_modified_offset (3) + -- default_p.create_date_offset (4) + AND COALESCE( + COALESCE( + user_p.exact_expiry_time, ---------------------------------------(1) + last_update_date + COALESCE( + user_p.last_modified_offset, ----------------------------(2) + CASE + WHEN + default_p.last_modified_offset IS NOT NULL + THEN + default_p.last_modified_offset::INTERVAL --------(3) + END + ) + ) + , + CASE --------------------------------------------------------------------(4) + WHEN default_p.create_date_offset = 'infinity' + THEN 'infinity'::TIMESTAMP + WHEN default_p.create_date_offset IS NULL + THEN NULL + ELSE + (create_date + default_p.create_date_offset::INTERVAL)::TIMESTAMP + END + ) <= now() -- compare to now to get only the expired jobs + ) expired_jobs + CROSS JOIN LATERAL + delete_or_expire_job( + partition_id, + job_id, + operation + ); +END; +$$; diff --git a/job-service-db/src/main/resources/db/migration/procedures/public/R__refreshDefaultExpiry.sql b/job-service-db/src/main/resources/db/migration/procedures/public/R__refreshDefaultExpiry.sql new file mode 100644 index 000000000..6fba1fc36 --- /dev/null +++ b/job-service-db/src/main/resources/db/migration/procedures/public/R__refreshDefaultExpiry.sql @@ -0,0 +1,60 @@ +-- +-- Copyright 2016-2021 Micro Focus or one of its affiliates. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +/* + * Name: refresh_default_expiry + * + * Description: + * Refreshes the default expiration policy + * This function is to be updated manually + * There must be a value in create_date_offset "or" last_modified_offset + * based on 'expiration_time' value + * If any value in last_modified_offset, then create_date_offset must be null + * If any value in create_date_offset, then last_modified_offset must be null + */ +CREATE OR REPLACE PROCEDURE refresh_default_expiry( +) + LANGUAGE plpgsql AS +$$ +BEGIN + -- Removes all existing policy from the table + DELETE + FROM default_job_expiration_policy + WHERE job_status IS NOT NULL; + + -- Insert the default expiration policy + INSERT INTO default_job_expiration_policy ( + job_status, + operation, + expiration_time, + -- create_date_offset can contain 'infinity or an interval' + -- this is based upon the value from expiration_time. + -- if expiration_time is related to created_time, then the offset is to be inserted. + -- if expiration_time equals 'none', then 'infinity' is to be inserted + create_date_offset, + -- last_modified_offset can contain a value only if create_date_offset is null + last_modified_offset + ) + VALUES ('Active', 'Expire', 'none', 'infinity', NULL), + ('Cancelled', 'Delete', 'none', 'infinity', NULL), + ('Completed', 'Delete', 'none', 'infinity', NULL), + ('Failed', 'Delete', 'none', 'infinity', NULL), + ('Paused', 'Expire', 'none', 'infinity', NULL), + ('Waiting', 'Expire', 'none', 'infinity', NULL), + ('Expired', 'Delete', 'none', 'infinity', NULL); +END; + +$$; diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index a2c6888e6..3a7d41df5 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -118,7 +118,7 @@ - + diff --git a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ApplyJobExpirationPolicyTask.java b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ApplyJobExpirationPolicyTask.java new file mode 100644 index 000000000..4d938cd83 --- /dev/null +++ b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ApplyJobExpirationPolicyTask.java @@ -0,0 +1,54 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.scheduled.executor; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.time.Duration; +import java.time.Instant; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ApplyJobExpirationPolicyTask implements Runnable{ + + private static final Logger LOG = LoggerFactory.getLogger(ApplyJobExpirationPolicyTask.class); + + @Override + public void run() + { + try(final Connection connection = DBConnection.get(); + final CallableStatement stmt = connection.prepareCall("CALL apply_job_expiration_policy()")) + { + if(LOG.isDebugEnabled()) + { + LOG.debug("Calling apply_job_expiration_policy() database function ..."); + final Instant start = Instant.now(); + stmt.execute(); + final Instant end = Instant.now(); + LOG.debug("Total time taken to apply job expiration policy in ms. {}", Duration.between(start, end).toMillis()); + } + else + { + stmt.execute(); + } + } + catch(final Exception e) + { + LOG.error("Caught exception while applying job policies.", e); + } + } +} diff --git a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java index 39d2ed0b3..c470cff07 100644 --- a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java +++ b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java @@ -44,6 +44,11 @@ public ScheduledExecutor() { scheduler.scheduleWithFixedDelay(task, 20, ScheduledExecutorConfig.getScheduledExecutorPeriod(), TimeUnit.SECONDS); + LOG.info("Starting task for applying the job expiration policy ..."); + // Execute the dropTablesTask periodically. + scheduler.scheduleWithFixedDelay(new ApplyJobExpirationPolicyTask(), 20, ScheduledExecutorConfig.getApplyExpirationPolicySchedulerPeriod(), + TimeUnit.SECONDS); + LOG.info("Starting task for dropping soft deleted tables ..."); // Execute the dropTablesTask periodically. scheduler.scheduleWithFixedDelay(new DropTablesTask(), 20, ScheduledExecutorConfig.getDropTablesSchedulerPeriod(), diff --git a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java index 4af038f4e..b63b299d9 100644 --- a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java +++ b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java @@ -23,6 +23,15 @@ */ public class ScheduledExecutorConfig { + public static int getApplyExpirationPolicySchedulerPeriod() { + // Default to 60 seconds if CAF_APPLY_EXPIRATION_POLICY_SCHEDULER_PERIOD not specified. + final String period = getPropertyOrEnvVar("CAF_APPLY_EXPIRATION_POLICY_SCHEDULER_PERIOD"); + if (null == period || period.isEmpty()) { + return 60; + } + return Integer.parseInt(period); + } + public static String getDatabaseHost(){ return getPropertyOrEnvVar("JOB_SERVICE_DATABASE_HOST"); } diff --git a/job-service/pom.xml b/job-service/pom.xml index f76755372..7abc10874 100644 --- a/job-service/pom.xml +++ b/job-service/pom.xml @@ -252,6 +252,11 @@ jersey-client test + + org.junit.jupiter + junit-jupiter-api + test + org.mockito mockito-core diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java b/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java index b71be2a00..3085bfac2 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java @@ -16,15 +16,21 @@ package com.hpe.caf.services.job.api; import com.hpe.caf.services.db.client.DatabaseConnectionProvider; +import com.hpe.caf.services.job.api.generated.model.DeletePolicy; +import com.hpe.caf.services.job.api.generated.model.ExpirablePolicy; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; import com.hpe.caf.services.job.api.generated.model.Failure; import com.hpe.caf.services.job.api.generated.model.Job; import com.hpe.caf.services.configuration.AppConfig; +import com.hpe.caf.services.job.api.generated.model.Policer; import com.hpe.caf.services.job.api.generated.model.SortDirection; import com.hpe.caf.services.job.api.generated.model.SortField; import com.hpe.caf.services.job.exceptions.BadRequestException; import com.hpe.caf.services.job.exceptions.ForbiddenException; import com.hpe.caf.services.job.exceptions.NotFoundException; import com.hpe.caf.services.job.exceptions.ServiceUnavailableException; +import com.hpe.caf.services.job.utilities.ExpirationPolicyHelper; + import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +59,7 @@ public final class DatabaseHelper private static final String POSTGRES_NO_DATA_ERROR_CODE = "02000"; private static final String POSTGRES_NO_DATA_FOUND_ERROR_CODE = "P0002"; private static final String POSTGRES_UNIQUE_VIOLATION_ERROR_CODE = "23505"; + private static final String JOB_POLICY_TYPE_NAME = "job_policy"; private static AppConfig appConfig; @@ -106,31 +113,21 @@ public Job[] getJobs(final String partitionId, String jobIdStartsWith, String st } stmt.setArray(9, array); stmt.setString(10, filter); + String jobId = ""; // Execute a query to return a list of all job definitions in the system. LOG.debug("Calling get_jobs() database function..."); + ExpirationPolicy expirationPolicy = null; try (final ResultSet rs = stmt.executeQuery()) { while (rs.next()) { final Job job = new Job(); job.setId(rs.getString("job_id")); - job.setName(rs.getString("name")); - job.setDescription(rs.getString("description")); - job.setExternalData(rs.getString("data")); - job.setCreateTime(getDate(rs.getString("create_date"))); - job.setLastUpdateTime(getDate(rs.getString("last_update_date"))); - job.setStatus(Job.StatusEnum.valueOf(rs.getString("status").toUpperCase(Locale.ENGLISH))); - job.setPercentageComplete(rs.getFloat("percentage_complete")); - - // Parse JSON failure sub-strings. - final String failureDetails = rs.getString("failure_details"); - if (ApiServiceUtil.isNotNullOrEmpty(failureDetails)) { - job.setFailures(getFailuresAsList(failureDetails)); - } - - final String label = rs.getString("label"); - if (ApiServiceUtil.isNotNullOrEmpty(label)) { - job.getLabels().put(label, rs.getString("label_value")); + if (!job.getId().equals(jobId)) { + expirationPolicy = new ExpirationPolicy(); + jobId = job.getId(); } + retrieveJob(job, rs, expirationPolicy); + LOG.debug("Job {}", job); //We joined onto the labels table and there may be multiple rows for the same job, so merge their labels jobs.merge(job.getId(), job, (orig, insert) -> { orig.getLabels().putAll(insert.getLabels()); @@ -200,31 +197,19 @@ public Job getJob(final String partitionId, String jobId) throws Exception { CallableStatement stmt = conn.prepareCall("{call get_job(?,?)}") ) { stmt.setString(1, partitionId); - stmt.setString(2,jobId); + stmt.setString(2, jobId); // Execute a query to return a list of all job definitions in the system. LOG.debug("Calling get_job() database function..."); - try (final ResultSet rs = stmt.executeQuery()) { + try ( + final ResultSet rs = stmt.executeQuery(); + ) { job = new Job(); + final ExpirationPolicy expirationPolicy = new ExpirationPolicy(); + job.setExpiry(expirationPolicy); while (rs.next()) { job.setId(rs.getString("job_id")); - job.setName(rs.getString("name")); - job.setDescription(rs.getString("description")); - job.setExternalData(rs.getString("data")); - job.setCreateTime(getDate(rs.getString("create_date"))); - job.setLastUpdateTime(getDate(rs.getString("last_update_date"))); - job.setStatus(Job.StatusEnum.valueOf(rs.getString("status").toUpperCase(Locale.ENGLISH))); - job.setPercentageComplete(rs.getFloat("percentage_complete")); - - // Parse JSON failure sub-strings. - final String failureDetails = rs.getString("failure_details"); - if (ApiServiceUtil.isNotNullOrEmpty(failureDetails)) { - job.setFailures(getFailuresAsList(failureDetails)); - } - final String label = rs.getString("label"); - if (ApiServiceUtil.isNotNullOrEmpty(label)) { - job.getLabels().put(label, rs.getString("label_value")); - } + retrieveJob(job, rs, expirationPolicy); } } } catch (final SQLException se) { @@ -271,10 +256,10 @@ private boolean callCreateJobFunction(final CallableStatement statement) throws public boolean createJob(final String partitionId, final String jobId, final String name, final String description, final String data, final int jobHash, final String taskClassifier, final int taskApiVersion, final byte[] taskData, final String taskPipe, - final String targetPipe, final int delay, final Map labels) throws Exception { + final String targetPipe, final int delay, final Map labels, final ExpirationPolicy expirationPolicy) throws Exception { try ( final Connection conn = DatabaseConnectionProvider.getConnection(appConfig); - final CallableStatement stmt = conn.prepareCall("{call create_job(?,?,?,?,?,?,?,?,?,?,?,?,?)}") + final CallableStatement stmt = conn.prepareCall("{call create_job(?,?,?,?,?,?,?,?,?,?,?,?,?,?)}") ) { final List labelArray = buildLabelSqlArray(labels); @@ -290,24 +275,40 @@ public boolean createJob(final String partitionId, final String jobId, final Str stmt.setString(10,taskPipe); stmt.setString(11,targetPipe); stmt.setInt(12,delay); + LOG.info("passing job: {}\n ExpiryPolicy {}", jobId, expirationPolicy); - Array array; + Array arrayL; if (!labelArray.isEmpty()) { - array = conn.createArrayOf("VARCHAR", labelArray.toArray()); + arrayL = conn.createArrayOf("VARCHAR", labelArray.toArray()); } else { - array = conn.createArrayOf("VARCHAR", new String[0]); + arrayL = conn.createArrayOf("VARCHAR", new String[0]); } - stmt.setArray(13, array); + stmt.setArray(13, arrayL); + final Array arrayP = setExpirationPolicy(expirationPolicy, conn, stmt, 14); try { return callCreateJobFunction(stmt); } finally { - array.free(); + arrayL.free(); + arrayP.free(); } } catch (final SQLException se) { throw mapSqlConnectionException(se); } } + private Array setExpirationPolicy(final ExpirationPolicy expirationPolicy, final Connection conn, final CallableStatement stmt, final int parameterIndex) throws SQLException { + final Array arrayP; + if (expirationPolicy != null) { + final List expirationPolicyList = ExpirationPolicyHelper.toPgCompositeList(expirationPolicy); + arrayP = conn.createArrayOf(JOB_POLICY_TYPE_NAME, expirationPolicyList.toArray(new String[0])); + LOG.debug("expirationPolicyDB: {}", expirationPolicyList); + } else { + arrayP = conn.createArrayOf(JOB_POLICY_TYPE_NAME, new ExpirablePolicy[0]); + } + stmt.setArray(parameterIndex, arrayP); + return arrayP; + } + /** * Creates the specified job. * @return Whether the job was created @@ -317,10 +318,10 @@ public boolean createJobWithDependencies(final String partitionId, final String final int taskApiVersion, final byte[] taskData, final String taskPipe, final String targetPipe, final List prerequisiteJobIds, final int delay, final Map labels, - final boolean partitionSuspended) throws Exception { + final boolean partitionSuspended, ExpirationPolicy expirationPolicy) throws Exception { try ( final Connection conn = DatabaseConnectionProvider.getConnection(appConfig); - final CallableStatement stmt = conn.prepareCall("{call create_job(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)}") + final CallableStatement stmt = conn.prepareCall("{call create_job(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)}") ) { final String[] prerequisiteJobIdStringArray = getPrerequisiteJobIds(prerequisiteJobIds); Array prerequisiteJobIdSQLArray = conn.createArrayOf("varchar", prerequisiteJobIdStringArray); @@ -350,10 +351,13 @@ public boolean createJobWithDependencies(final String partitionId, final String stmt.setArray(14, array); stmt.setBoolean(15, partitionSuspended); + final Array arrayP = setExpirationPolicy(expirationPolicy, conn, stmt, 16); + try { return callCreateJobFunction(stmt); } finally { array.free(); + arrayP.free(); prerequisiteJobIdSQLArray.free(); } } catch (final SQLException se) { @@ -361,6 +365,80 @@ public boolean createJobWithDependencies(final String partitionId, final String } } + private void retrieveJob(final Job job, final ResultSet rs, final ExpirationPolicy expirationPolicy) throws Exception { + job.setExpiry(retrieveExpirationPolicy(expirationPolicy, rs)); + job.setName(rs.getString("name")); + job.setDescription(rs.getString("description")); + job.setExternalData(rs.getString("data")); + job.setCreateTime(getDate(rs.getString("create_date"))); + job.setLastUpdateTime(getDate(rs.getString("last_update_date"))); + job.setStatus(Job.StatusEnum.valueOf(rs.getString("status").toUpperCase(Locale.ENGLISH))); + job.setPercentageComplete(rs.getFloat("percentage_complete")); + + // Parse JSON failure sub-strings. + final String failureDetails = rs.getString("failure_details"); + if (ApiServiceUtil.isNotNullOrEmpty(failureDetails)) { + job.setFailures(getFailuresAsList(failureDetails)); + } + final String label = rs.getString("label"); + if (ApiServiceUtil.isNotNullOrEmpty(label)) { + job.getLabels().put(label, rs.getString("label_value")); + } + } + + private ExpirationPolicy retrieveExpirationPolicy(final ExpirationPolicy expirationPolicy, final ResultSet rs) + throws SQLException { + final String status = rs.getString("expiration_status"); + if (ApiServiceUtil.isNotNullOrEmpty(status)) { + final ExpirablePolicy expirablePolicy = new ExpirablePolicy(); + final DeletePolicy deletePolicy = new DeletePolicy(); + switch (status) { + case "Active": + transferExpirablePolicy(rs, expirablePolicy); + expirationPolicy.setActive(expirablePolicy); + break; + case "Cancelled": + transferDeletePolicy(rs, deletePolicy); + expirationPolicy.setCancelled(deletePolicy); + break; + case "Completed": + transferDeletePolicy(rs, deletePolicy); + expirationPolicy.setCompleted(deletePolicy); + break; + case "Failed": + transferDeletePolicy(rs, deletePolicy); + expirationPolicy.setFailed(deletePolicy); + break; + case "Paused": + transferExpirablePolicy(rs, expirablePolicy); + expirationPolicy.setPaused(expirablePolicy); + break; + case "Waiting": + transferExpirablePolicy(rs, expirablePolicy); + expirationPolicy.setWaiting(expirablePolicy); + break; + default: + transferDeletePolicy(rs, deletePolicy); + expirationPolicy.setExpired(deletePolicy); + break; + } + + } + return expirationPolicy; + } + + private void transferExpirablePolicy(final ResultSet resultSet, final ExpirablePolicy expirablePolicy) throws SQLException { + expirablePolicy.setExpiryTime(resultSet.getString("expiration_time")); + expirablePolicy.setOperation(ExpirablePolicy.OperationEnum.valueOf(resultSet.getString("operation").toUpperCase(Locale.ROOT))); + expirablePolicy.setPolicer(Policer.valueOf(resultSet.getString("policer"))); + } + + private void transferDeletePolicy(final ResultSet resultSet, final DeletePolicy deletePolicy) throws SQLException { + deletePolicy.setExpiryTime(resultSet.getString("expiration_time")); + deletePolicy.setOperation(DeletePolicy.OperationEnum.valueOf(resultSet.getString("operation").toUpperCase(Locale.ROOT))); + deletePolicy.setPolicer(Policer.valueOf(resultSet.getString("policer"))); + } + private List buildLabelSqlArray(final Map labels) { return labels.entrySet().stream().map(entry -> new String[]{entry.getKey(), entry.getValue()}) .collect(Collectors.toList()); @@ -591,4 +669,27 @@ private static void throwIfUnexpectedException(final SQLException se) throws Exc throw se; } } + + public ExpirationPolicy getDefaultExpirationPolicy() throws Exception { + + ExpirationPolicy expirationPolicy = new ExpirationPolicy(); + try ( + final Connection conn = DatabaseConnectionProvider.getConnection(appConfig); + CallableStatement stmt = conn.prepareCall("{call get_default_expiry()}") + ) { + // Execute a query to return a list of all job definitions in the system. + LOG.debug("Calling get_job() database function..."); + try ( + final ResultSet rs = stmt.executeQuery(); + ) { + while (rs.next()) { + expirationPolicy =retrieveExpirationPolicy(expirationPolicy, rs); + } + } + } catch (final SQLException se) { + throw mapSqlNoDataException(se); + } + + return expirationPolicy; + } } diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/JobsGetExpiry.java b/job-service/src/main/java/com/hpe/caf/services/job/api/JobsGetExpiry.java new file mode 100644 index 000000000..0d6000eab --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/JobsGetExpiry.java @@ -0,0 +1,59 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.api; + +import com.hpe.caf.services.configuration.AppConfig; +import com.hpe.caf.services.configuration.AppConfigProvider; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class JobsGetExpiry { + + private static final Logger LOG = LoggerFactory.getLogger(JobsGetExpiry.class); + + /** + * Gets the default job expiry setting. + * + * @return expiration policy + * @throws Exception bad request or database exceptions + */ + public static ExpirationPolicy getDefault() throws Exception { + + final ExpirationPolicy expirablePolicy; + + try { + LOG.debug("getDefaultExpirationPolicy: Starting..."); + + // Get app config settings. + LOG.debug("getDefaultExpirationPolicy: Reading database connection properties..."); + final AppConfig config = AppConfigProvider.getAppConfigProperties(); + + // Get database helper instance. + final DatabaseHelper databaseHelper = new DatabaseHelper(config); + + + expirablePolicy = databaseHelper.getDefaultExpirationPolicy(); + } catch (final Exception e) { + LOG.error("Error - ", e); + throw e; + } + + + LOG.debug("getDefaultExpirationPolicy: Done."); + return expirablePolicy; + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/JobsPut.java b/job-service/src/main/java/com/hpe/caf/services/job/api/JobsPut.java index 509886a5c..83a24d225 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/JobsPut.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/JobsPut.java @@ -28,6 +28,7 @@ import com.hpe.caf.services.job.jobtype.JobTypes; import com.hpe.caf.services.job.queue.QueueServices; import com.hpe.caf.services.job.queue.QueueServicesFactory; +import com.hpe.caf.services.job.utilities.PolicyBuilder; import com.hpe.caf.util.ModuleLoader; import com.hpe.caf.worker.document.DocumentWorkerConstants; import com.hpe.caf.worker.document.DocumentWorkerDocumentTask; @@ -86,6 +87,9 @@ public static String createOrUpdateJob(final String partitionId, String jobId, N throw new BadRequestException(ApiServiceUtil.ERR_MSG_JOB_ID_CONTAINS_INVALID_CHARS); } + // Validates the job expiry policies and populate the job with the complete list of them + PolicyBuilder.buildPolicyMap(job); + // if `job` is provided, construct `task` from it final WorkerAction jobTask; if (job.getType() == null) { @@ -187,13 +191,13 @@ public static String createOrUpdateJob(final String partitionId, String jobId, N jobCreated = databaseHelper.createJobWithDependencies(partitionId, jobId, job.getName(), job.getDescription(), job.getExternalData(), jobHash, jobTask.getTaskClassifier(), jobTask.getTaskApiVersion(), getTaskDataBytes(jobTask, codec), jobTask.getTaskPipe(), jobTask.getTargetPipe(), - job.getPrerequisiteJobIds(), job.getDelay(), job.getLabels(), partitionSuspended); + job.getPrerequisiteJobIds(), job.getDelay(), job.getLabels(), partitionSuspended, job.getExpiry()); } else { jobCreated = databaseHelper.createJob(partitionId, jobId, job.getName(), job.getDescription(), job.getExternalData(), jobHash, jobTask.getTaskClassifier(), jobTask.getTaskApiVersion(), getTaskDataBytes(jobTask, codec), jobTask.getTaskPipe(), jobTask.getTargetPipe(), - job.getDelay(), job.getLabels()); + job.getDelay(), job.getLabels(), job.getExpiry()); } if (!jobCreated) { diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApi.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApi.java index 573d7265e..8432eac4b 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApi.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApi.java @@ -17,6 +17,7 @@ import io.swagger.annotations.ApiParam; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; import com.hpe.caf.services.job.api.generated.model.Job; import com.hpe.caf.services.job.api.generated.model.NewJob; @@ -48,7 +49,9 @@ public class JobsApi { public Response getJobs( @ApiParam(value = "Only allow access to jobs in the container with this name",required=true) @PathParam("partitionId") String partitionId, @ApiParam(value = "Only those results whose job id starts with this value will be returned") @QueryParam("jobIdStartsWith") String jobIdStartsWith, - @ApiParam(value = "All - no status filter is applied (Default); NotCompleted - only those results with statuses other than Completed will be returned; Completed - only those results with Completed status will be returned; Inactive - only those results with inactive statuses (i.e. Completed, Failed, Cancelled) will be returned; NotFinished - only those results with unfinished statuses (ie. Active, Paused, Waiting) will be returned.") @QueryParam("statusType") String statusType, + @ApiParam(value = "All - no status filter is applied (Default); NotCompleted - only those results with statuses other than " + + "Completed will be returned; Completed - only those results with Completed status will be returned; Inactive - only " + + "those results with inactive statuses (i.e. Completed, Expìred, Failed, Cancelled) will be returned; NotFinished - only those results with unfinished statuses (ie. Active, Paused, Waiting) will be returned.") @QueryParam("statusType") String statusType, @ApiParam(value = "The maximum results to return (i.e. page size)") @QueryParam("limit") Integer limit, @ApiParam(value = "The starting position from which to return results (useful for paging)") @QueryParam("offset") Integer offset, @ApiParam(value = "An identifier that can be used to correlate events that occurred\nacross different CAF services" )@HeaderParam("CAF-Correlation-Id") String cAFCorrelationId, @@ -80,6 +83,21 @@ public Response getJob( return delegate.getJob(partitionId, jobId,cAFCorrelationId,securityContext); } + @GET + @Path("/partitions/default-expiry") + @Consumes({ "application/json" }) + @Produces({ "application/json" }) + @io.swagger.annotations.ApiOperation(value = "Gets the default expiry settings", notes = "Retrieves the default expiry settings", + response = ExpirationPolicy.class, tags={"Jobs", }) + @io.swagger.annotations.ApiResponses(value = { + @io.swagger.annotations.ApiResponse(code = 200, message = "Returns the default expiry settings", response = ExpirationPolicy.class), + + @io.swagger.annotations.ApiResponse(code = 503, message = "The request failed due to a database connection error.") + }) + public Response getDefaultExpiry(@Context SecurityContext securityContext) throws Exception { + return delegate.getDefaultExpiry(securityContext); + } + @PUT @Path("/{partitionId}/jobs/{jobId}") @Consumes({ "application/json" }) diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiService.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiService.java index 3d2b386f9..1dd96e62c 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiService.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiService.java @@ -55,4 +55,5 @@ public abstract Response getJobActive(String partitionId,String jobId,String cAF public abstract Response getJobStatus(String partitionId, String jobId, String cAFCorrelationId, SecurityContext securityContext) throws Exception; + public abstract Response getDefaultExpiry(SecurityContext securityContext) throws Exception; } diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiServiceImpl.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiServiceImpl.java index 84c198d94..ee1d77a2c 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiServiceImpl.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/JobsApiServiceImpl.java @@ -16,6 +16,7 @@ package com.hpe.caf.services.job.api.generated; import com.hpe.caf.services.job.api.*; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; import com.hpe.caf.services.job.api.generated.model.Job; import com.hpe.caf.services.job.api.generated.model.NewJob; import com.hpe.caf.services.job.exceptions.BadRequestException; @@ -27,9 +28,14 @@ import javax.ws.rs.core.UriInfo; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaJerseyServerCodegen", date = "2016-02-29T10:25:31.219Z") public class JobsApiServiceImpl extends JobsApiService { + private static final Logger LOG = LoggerFactory.getLogger(JobsApiServiceImpl.class); + @Override public Response getJobs(final String partitionId, final String jobIdStartsWith, final String statusType, final Integer limit, final Integer offset, String cAFCorrelationId, String sort, @@ -111,4 +117,11 @@ public Response getJobStatus(final String partitionId, final String jobId, final return Response.ok().header("CacheableJobStatus", true).entity(jobStatusResult.jobStatus).cacheControl(cacheControl).build(); } + @Override + public Response getDefaultExpiry(final SecurityContext securityContext) throws Exception { + LOG.debug("getDefaultExpiry impl"); + final ExpirationPolicy defaultExpiry = JobsGetExpiry.getDefault(); + return Response.ok().entity(defaultExpiry).build(); + } + } diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/DeletePolicy.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/DeletePolicy.java new file mode 100644 index 000000000..fc7ae2e8c --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/DeletePolicy.java @@ -0,0 +1,115 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.api.generated.model; + +import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + + + +/** + * The expiration details to be applied on the job + **/ + +@ApiModel(description = "The expiration details to be applied on the job") +@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2021-03-12T12:28:51.784Z") +public class DeletePolicy { + private String expiryTime = null; + private DeletePolicy.OperationEnum operation; + private Policer policer; + + public DeletePolicy() { + this.operation = DeletePolicy.OperationEnum.DELETE; + this.policer = null; + } + + @ApiModelProperty("The delay before expiration") + @JsonProperty("expiryTime") + public String getExpiryTime() { + return this.expiryTime; + } + + public void setExpiryTime(String expiryTime) { + this.expiryTime = expiryTime; + } + + @ApiModelProperty("The action to apply on expired jobs") + @JsonProperty("operation") + public DeletePolicy.OperationEnum getOperation() { + return this.operation; + } + + public void setOperation(DeletePolicy.OperationEnum operation) { + this.operation = operation; + } + + @ApiModelProperty("The instance defining the policy") + @JsonProperty("policer") + public Policer getPolicer() { + return this.policer; + } + + public void setPolicer(Policer policer) { + this.policer = policer; + } + + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o != null && this.getClass() == o.getClass()) { + DeletePolicy deletePolicy = (DeletePolicy)o; + return Objects.equals(this.expiryTime, deletePolicy.expiryTime) && Objects.equals(this.operation, deletePolicy.operation) && Objects.equals(this.policer, deletePolicy.policer); + } else { + return false; + } + } + + public int hashCode() { + return Objects.hash(new Object[]{this.expiryTime, this.operation, this.policer}); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class DeletePolicy {\n"); + sb.append(" expiryTime: ").append(this.toIndentedString(this.expiryTime)).append("\n"); + sb.append(" operation: ").append(this.toIndentedString(this.operation)).append("\n"); + sb.append(" policer: ").append(this.toIndentedString(this.policer)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + return o == null ? "null" : o.toString().replace("\n", "\n "); + } + + public static enum OperationEnum { + DELETE("Delete"); + + private String value; + + private OperationEnum(String value) { + this.value = value; + } + + @JsonValue + public String toString() { + return this.value; + } + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirablePolicy.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirablePolicy.java new file mode 100644 index 000000000..2a39a1dbd --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirablePolicy.java @@ -0,0 +1,141 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.api.generated.model; + +import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + + + +/** + * The expiration details to be applied on the job + **/ + +@ApiModel(description = "The expiration details to be applied on the job") +@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2021-03-16T14:04:09.471Z") +public class ExpirablePolicy { + + private String expiryTime = null; + private OperationEnum operation = OperationEnum.EXPIRE; + private Policer policer = null; + + + + /** + * The delay before expiration + **/ + + @ApiModelProperty(value = "") + @JsonProperty("expiryTime") + public String getExpiryTime() { + return expiryTime; + } + + public void setExpiryTime(String expiryTime) { + this.expiryTime = expiryTime; + } + + /** + * The action to apply on expired jobs + **/ + + @ApiModelProperty(value = "The action to apply on expired jobs") + @JsonProperty("operation") + public OperationEnum getOperation() { + return operation; + } + + public void setOperation(OperationEnum operation) { + this.operation = operation; + } + /** + * The instance defining the policy + **/ + + @ApiModelProperty(value = "The instance defining the policy") + @JsonProperty("policer") + public Policer getPolicer() { + return policer; + } + + public void setPolicer(Policer policer) { + this.policer = policer; + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ExpirablePolicy expirablePolicy = (ExpirablePolicy) o; + return Objects.equals(expiryTime, expirablePolicy.expiryTime) && + Objects.equals(operation, expirablePolicy.operation) && + Objects.equals(policer, expirablePolicy.policer); + } + + @Override + public int hashCode() { + return Objects.hash(expiryTime, operation, policer); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class ExpirablePolicy {\n"); + + sb.append(" expiryTime: ").append(toIndentedString(expiryTime)).append("\n"); + sb.append(" operation: ").append(toIndentedString(operation)).append("\n"); + sb.append(" policer: ").append(toIndentedString(policer)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + /** + * Convert the given object to string with each line indented by 4 spaces + * (except the first line). + */ + private String toIndentedString(Object o) { + if (o == null) { + return "null"; + } + return o.toString().replace("\n", "\n "); + } + + public enum OperationEnum { + EXPIRE("Expire"), + DELETE("Delete"); + + private String value; + + + OperationEnum(String value) { + this.value = value; + } + @Override + @JsonValue + public String toString() { + return value; + } + + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirationPolicy.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirationPolicy.java new file mode 100644 index 000000000..b3272e8ae --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/ExpirationPolicy.java @@ -0,0 +1,151 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.api.generated.model; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +import java.util.Objects; + +@ApiModel( + description = "The expiration policy to be applied on the job" +) +public class ExpirationPolicy { + private ExpirablePolicy active = null; + private DeletePolicy completed = null; + private DeletePolicy failed = null; + private DeletePolicy cancelled = null; + private ExpirablePolicy waiting = null; + private ExpirablePolicy paused = null; + private DeletePolicy expired = null; + + public ExpirationPolicy() { + } + + @ApiModelProperty("") + @JsonProperty("Active") + public ExpirablePolicy getActive() { + return this.active; + } + + public void setActive(ExpirablePolicy active) { + this.active = active; + } + + @ApiModelProperty("") + @JsonProperty("Completed") + public DeletePolicy getCompleted() { + return this.completed; + } + + public void setCompleted(DeletePolicy completed) { + this.completed = completed; + } + + @ApiModelProperty("") + @JsonProperty("Failed") + public DeletePolicy getFailed() { + return this.failed; + } + + public void setFailed(DeletePolicy failed) { + this.failed = failed; + } + + @ApiModelProperty("") + @JsonProperty("Cancelled") + public DeletePolicy getCancelled() { + return this.cancelled; + } + + public void setCancelled(DeletePolicy cancelled) { + this.cancelled = cancelled; + } + + @ApiModelProperty("") + @JsonProperty("Waiting") + public ExpirablePolicy getWaiting() { + return this.waiting; + } + + public void setWaiting(ExpirablePolicy waiting) { + this.waiting = waiting; + } + + @ApiModelProperty("") + @JsonProperty("Paused") + public ExpirablePolicy getPaused() { + return this.paused; + } + + public void setPaused(ExpirablePolicy paused) { + this.paused = paused; + } + + @ApiModelProperty("") + @JsonProperty("Expired") + public DeletePolicy getExpired() { + return this.expired; + } + + public void setExpired(DeletePolicy expired) { + this.expired = expired; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o != null && this.getClass() == o.getClass()) { + ExpirationPolicy expirationPolicy = (ExpirationPolicy)o; + return Objects.equals(this.active, expirationPolicy.active) && + Objects.equals(this.completed, expirationPolicy.completed) && + Objects.equals(this.failed, expirationPolicy.failed) && + Objects.equals(this.cancelled, expirationPolicy.cancelled) && + Objects.equals(this.waiting, expirationPolicy.waiting) && + Objects.equals(this.paused, expirationPolicy.paused) && + Objects.equals(this.expired, expirationPolicy.expired); + } else { + return false; + } + } + + @Override + public int hashCode() { + return Objects.hash(new Object[]{this.active, this.completed, this.failed, + this.cancelled, this.waiting, this.paused, this.expired}); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("class ExpirationPolicy {\n"); + sb.append(" active: ").append(this.toIndentedString(this.active)).append("\n"); + sb.append(" completed: ").append(this.toIndentedString(this.completed)).append("\n"); + sb.append(" failed: ").append(this.toIndentedString(this.failed)).append("\n"); + sb.append(" cancelled: ").append(this.toIndentedString(this.cancelled)).append("\n"); + sb.append(" waiting: ").append(this.toIndentedString(this.waiting)).append("\n"); + sb.append(" paused: ").append(this.toIndentedString(this.paused)).append("\n"); + sb.append(" expired: ").append(this.toIndentedString(this.expired)).append("\n"); + sb.append("}"); + return sb.toString(); + } + + private String toIndentedString(Object o) { + return o == null ? "null" : o.toString().replace("\n", "\n "); + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Job.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Job.java index 8f662251d..db75df294 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Job.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Job.java @@ -35,6 +35,7 @@ public class Job { private String id = null; private String name = null; private String description = null; + private ExpirationPolicy expiry = null; /** * @deprecated 21/01/2020 - Replaced by labels functionality. */ @@ -48,6 +49,7 @@ public enum StatusEnum { ACTIVE("Active"), CANCELLED("Cancelled"), COMPLETED("Completed"), + EXPIRED("Expired"), FAILED("Failed"), PAUSED("Paused"), WAITING("Waiting"); @@ -126,6 +128,15 @@ public void setDescription(String description) { this.description = description; } + @ApiModelProperty("") + @JsonProperty("expiry") + public ExpirationPolicy getExpiry() { + return this.expiry; + } + + public void setExpiry(ExpirationPolicy expiry) { + this.expiry = expiry; + } /** * External data can be associated with the job for use by other components @@ -260,6 +271,7 @@ public boolean equals(Object o) { return Objects.equals(id, job.id) && Objects.equals(name, job.name) && Objects.equals(description, job.description) && + Objects.equals(this.expiry, job.expiry) && Objects.equals(externalData, job.externalData) && Objects.equals(createTime, job.createTime) && Objects.equals(this.lastUpdateTime, job.lastUpdateTime) && @@ -270,7 +282,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(id, name, description, externalData, createTime, lastUpdateTime, status, percentageComplete, failures); + return Objects.hash(id, name, description, expiry, externalData, createTime, lastUpdateTime, status, percentageComplete, + failures); } @Override @@ -281,6 +294,7 @@ public String toString() { sb.append(" id: ").append(toIndentedString(id)).append("\n"); sb.append(" name: ").append(toIndentedString(name)).append("\n"); sb.append(" description: ").append(toIndentedString(description)).append("\n"); + sb.append(" expiry: ").append(this.toIndentedString(this.expiry)).append("\n"); sb.append(" externalData: ").append(toIndentedString(externalData)).append("\n"); sb.append(" createTime: ").append(toIndentedString(createTime)).append("\n"); sb.append(" lastUpdateTime: ").append(toIndentedString(lastUpdateTime)).append("\n"); diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/NewJob.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/NewJob.java index 2d9ff8c36..89b5fa3da 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/NewJob.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/NewJob.java @@ -35,6 +35,7 @@ public class NewJob { private String name = null; private String description = null; + private ExpirationPolicy expiry = null; /** * @deprecated 21/01/2020 - Replaced by labels functionality. */ @@ -93,6 +94,14 @@ public NewJob externalData(String externalData) { return this; } + @ApiModelProperty(value = "") + @JsonProperty("expiry") + public ExpirationPolicy getExpiry() { + return expiry; + } + public void setExpiry(ExpirationPolicy expiry) { + this.expiry = expiry; + } @ApiModelProperty(value = "External data can be associated with the job for use by other components") @JsonProperty("externalData") @@ -221,6 +230,7 @@ public boolean equals(Object o) { NewJob newJob = (NewJob) o; return Objects.equals(name, newJob.name) && Objects.equals(description, newJob.description) && + Objects.equals(expiry, newJob.expiry) && Objects.equals(externalData, newJob.externalData) && Objects.equals(task, newJob.task) && Objects.equals(prerequisiteJobIds, newJob.prerequisiteJobIds) && @@ -229,7 +239,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(name, description, externalData, task, prerequisiteJobIds, delay); + return Objects.hash(name, description, expiry, externalData, task, prerequisiteJobIds, delay); } @Override @@ -239,6 +249,7 @@ public String toString() { sb.append(" name: ").append(toIndentedString(name)).append("\n"); sb.append(" description: ").append(toIndentedString(description)).append("\n"); + sb.append(" expiry: ").append(toIndentedString(expiry)).append("\n"); sb.append(" externalData: ").append(toIndentedString(externalData)).append("\n"); sb.append(" task: ").append(toIndentedString(task)).append("\n"); sb.append(" prerequisiteJobIds: ").append(toIndentedString(prerequisiteJobIds)).append("\n"); diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Policer.java b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Policer.java new file mode 100644 index 000000000..1881ec11b --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/generated/model/Policer.java @@ -0,0 +1,24 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.api.generated.model; + +public enum Policer { + User, + System; + + private Policer() { + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/utilities/DateHelper.java b/job-service/src/main/java/com/hpe/caf/services/job/utilities/DateHelper.java new file mode 100644 index 000000000..a2d4862b4 --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/utilities/DateHelper.java @@ -0,0 +1,67 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.utilities; + +import com.hpe.caf.services.job.exceptions.BadRequestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.format.DateTimeParseException; +import java.util.regex.Pattern; + +public final class DateHelper { + private static final Logger LOG = LoggerFactory.getLogger(DateHelper.class); + + /** + * ISO 8601 / RFC 3339 https://en.wikipedia.org/wiki/ISO_8601#Durations + */ + private static final Pattern DATE_REGEX = Pattern.compile( + "^(lastUpdateTime|createTime)\\+((P)\\d+[DYM])$|" + + "^(lastUpdateTime|createTime)\\+(PT)?(\\d+[HMS])$|" + + "^none$"); + + private DateHelper() { + } + + /** + * P1Y 1 year
+ * P1M 1 month
+ * P1D 1 day
+ * 1H 1 hour
+ * 1M 1 minute
+ * 1S 1 second
+ * https://www.postgresql.org/docs/9.3/datatype-datetime.html + * @param dateToCheck the date to validate + * @throws BadRequestException if any invalid parameter + */ + public static void validate(final String dateToCheck) throws BadRequestException { + if (!DATE_REGEX.matcher(dateToCheck).matches()) { + try{ + // validate date format + final Instant instantPassed = Instant.parse(dateToCheck); + // verify that date is in the future + if (instantPassed.isBefore(Instant.now())) { + LOG.warn("Date is in the past: {}", dateToCheck); + } + }catch (final DateTimeParseException e){ + final String errorMessage = "Invalid date "+dateToCheck; + LOG.error(errorMessage); + throw new BadRequestException(errorMessage); + } + } + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/utilities/ExpirationPolicyHelper.java b/job-service/src/main/java/com/hpe/caf/services/job/utilities/ExpirationPolicyHelper.java new file mode 100644 index 000000000..f29be86eb --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/utilities/ExpirationPolicyHelper.java @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.utilities; + +import com.hpe.caf.services.job.api.generated.model.DeletePolicy; +import com.hpe.caf.services.job.api.generated.model.DeletePolicy.OperationEnum; +import com.hpe.caf.services.job.api.generated.model.ExpirablePolicy; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public final class ExpirationPolicyHelper +{ + private static final Logger LOG = LoggerFactory.getLogger(ExpirationPolicyHelper.class); + private ExpirationPolicyHelper() + { + } + + public static List toPgCompositeList(final ExpirationPolicy expirationPolicy) + { + LOG.debug("converting into composite"); + final List policyList = new ArrayList<>(); + // build policy when status could be set to expire + if(null!= expirationPolicy.getActive())buildExpirablePolicy(policyList, "Active", expirationPolicy.getActive()); + if(null!= expirationPolicy.getWaiting())buildExpirablePolicy(policyList, "Waiting", expirationPolicy.getWaiting()); + if(null!= expirationPolicy.getPaused())buildExpirablePolicy(policyList, "Paused", expirationPolicy.getPaused()); + + // build policy when delete is the only option + if(null!= expirationPolicy.getCancelled())buildDeletePolicy(policyList, "Cancelled", expirationPolicy.getCancelled()); + if(null!= expirationPolicy.getCompleted())buildDeletePolicy(policyList, "Completed", expirationPolicy.getCompleted()); + if(null!= expirationPolicy.getFailed())buildDeletePolicy(policyList, "Failed", expirationPolicy.getFailed()); + if(null!= expirationPolicy.getExpired())buildDeletePolicy(policyList, "Expired", expirationPolicy.getExpired()); + LOG.debug("policyList {}", policyList); + return policyList; + } + + private static void buildDeletePolicy(final List policyList, final String status, final DeletePolicy deletePolicy) { + policyList.add(toJobPolicyDbTypeString( + status, + OperationEnum.DELETE, + deletePolicy.getExpiryTime())); + } + + private static void buildExpirablePolicy(final List policyList, final String status, final ExpirablePolicy expirablePolicy) + { + policyList.add(toJobPolicyDbTypeString(status, expirablePolicy)); + } + + private static String toJobPolicyDbTypeString(final String status, final ExpirablePolicy expirablePolicy) + { + return toJobPolicyDbTypeString(status, expirablePolicy.getOperation(), expirablePolicy.getExpiryTime()); + } + + private static String toJobPolicyDbTypeString(final String status, final Object operation, final String expiryTime) + { + // Builds up the Composite Value for the JOB_POLICY database type + // See https://www.postgresql.org/docs/current/rowtypes.html + + // The expiry time has already been validated. + // The allowed patterns do not contain any commas or parentheses so there is no need for escaping here. + // + // The definition of JOB_POLICY is (partition_id, job_id, job_status, operation, expiration_time) + return "(,,(\"" + status + "," + operation + "," + expiryTime + ")\")"; + } +} diff --git a/job-service/src/main/java/com/hpe/caf/services/job/utilities/PolicyBuilder.java b/job-service/src/main/java/com/hpe/caf/services/job/utilities/PolicyBuilder.java new file mode 100644 index 000000000..3688aa73b --- /dev/null +++ b/job-service/src/main/java/com/hpe/caf/services/job/utilities/PolicyBuilder.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.utilities; + +import com.hpe.caf.services.job.api.generated.model.DeletePolicy; +import com.hpe.caf.services.job.api.generated.model.ExpirablePolicy; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; +import com.hpe.caf.services.job.api.generated.model.NewJob; +import com.hpe.caf.services.job.exceptions.BadRequestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class PolicyBuilder +{ + private static final Logger LOG = LoggerFactory.getLogger(PolicyBuilder.class); + private PolicyBuilder() + { + } + + /** + * Validates the job expiry policies and populates the job with the complete list of them. + * + * @param job the job to be created + * @throws BadRequestException if any invalid parameter + */ + public static void buildPolicyMap(final NewJob job) throws BadRequestException + { + final ExpirationPolicy expirationPolicies = getExpiryPolicyFromJob(job); + job.setExpiry(expirationPolicies); + } + + private static ExpirationPolicy getExpiryPolicyFromJob(final NewJob job) throws BadRequestException { + if (null != job.getExpiry()) { + LOG.debug("expiry is not null"); + checkPolicyDates(job.getExpiry()); + return job.getExpiry(); + } else { + return new ExpirationPolicy(); + } + } + + private static void checkPolicyDates( + final ExpirationPolicy expirationPolicies + ) throws BadRequestException + { + checkDateForExpirablePolicy(expirationPolicies.getActive()); + checkDateForExpirablePolicy(expirationPolicies.getWaiting()); + checkDateForExpirablePolicy(expirationPolicies.getPaused()); + checkDateForDeletePolicy(expirationPolicies.getCompleted()); + checkDateForDeletePolicy(expirationPolicies.getCancelled()); + checkDateForDeletePolicy(expirationPolicies.getFailed()); + checkDateForDeletePolicy(expirationPolicies.getExpired()); + } + + private static void checkDateForDeletePolicy(final DeletePolicy deletePolicy) throws BadRequestException { + if (null != deletePolicy) { + LOG.debug("validating {}", deletePolicy); + DateHelper.validate(deletePolicy.getExpiryTime()); + } + } + + private static void checkDateForExpirablePolicy(final ExpirablePolicy expirablePolicy) throws BadRequestException { + if (null != expirablePolicy) { + LOG.debug("validating {}", expirablePolicy); + DateHelper.validate(expirablePolicy.getExpiryTime()); + } + } + +} diff --git a/job-service/src/test/java/com/hpe/caf/services/job/api/JobsPutTest.java b/job-service/src/test/java/com/hpe/caf/services/job/api/JobsPutTest.java index 8ec172995..3cf19c4af 100644 --- a/job-service/src/test/java/com/hpe/caf/services/job/api/JobsPutTest.java +++ b/job-service/src/test/java/com/hpe/caf/services/job/api/JobsPutTest.java @@ -30,6 +30,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.TextNode; +import com.hpe.caf.services.job.api.generated.model.ExpirablePolicy; +import com.hpe.caf.services.job.api.generated.model.ExpirationPolicy; import com.hpe.caf.services.job.api.generated.model.NewJob; import com.hpe.caf.services.job.api.generated.model.RestrictedTask; import com.hpe.caf.services.job.api.generated.model.WorkerAction; @@ -79,6 +81,12 @@ private NewJob makeBaseJob() { job.setName("TestName"); job.setDescription("TestDescription"); job.setExternalData("TestExternalData"); + final ExpirationPolicy expirationPolicy = new ExpirationPolicy(); + final ExpirablePolicy expirablePolicy = new ExpirablePolicy(); + expirablePolicy.setOperation(ExpirablePolicy.OperationEnum.EXPIRE); + expirablePolicy.setExpiryTime("createTime+P10D"); + expirationPolicy.setActive(expirablePolicy); + job.setExpiry(expirationPolicy); return job; } @@ -116,10 +124,12 @@ public void setup() throws Exception { // Mock DatabaseHelper calls. when(mockDatabaseHelper.createJob( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), anyInt(), anyMap())).thenReturn(true); + anyInt(), any(), anyString(), anyString(), anyInt(), anyMap(), + any(ExpirationPolicy.class))).thenReturn(true); when(mockDatabaseHelper.createJobWithDependencies( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false) + anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false), + any(ExpirationPolicy.class) )).thenReturn(true); doNothing().when(mockDatabaseHelper).deleteJob(anyString(), anyString()); PowerMockito.whenNew(DatabaseHelper.class).withArguments(any()).thenReturn(mockDatabaseHelper); @@ -175,7 +185,8 @@ public void testCreateJob_Success_NoMatchingJobRow() throws Exception { verify(mockDatabaseHelper, times(1)) .createJob(eq("partition"), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), anyInt(), anyMap()); + anyInt(), any(), anyString(), anyString(), anyInt(), anyMap(), + any(ExpirationPolicy.class)); assertEquals("create", result); } @@ -183,7 +194,8 @@ public void testCreateJob_Success_NoMatchingJobRow() throws Exception { public void testCreateJob_Success_MatchingJobRow() throws Exception { when(mockDatabaseHelper.createJob( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), anyInt(), anyMap())).thenReturn(false); + anyInt(), any(), anyString(), anyString(), anyInt(), anyMap(), + any(ExpirationPolicy.class))).thenReturn(false); // Test successful run of job creation when a matching job row already exists. final String result = JobsPut.createOrUpdateJob( @@ -192,7 +204,8 @@ public void testCreateJob_Success_MatchingJobRow() throws Exception { verify(mockDatabaseHelper, times(1)) .createJob( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), anyInt(), anyMap()); + anyInt(), any(), anyString(), anyString(), anyInt(), anyMap(), + any(ExpirationPolicy.class)); assertEquals("update", result); } @@ -251,7 +264,8 @@ public void testCreateRestrictedJob_Success() throws Exception { verify(mockDatabaseHelper, times(1)) .createJob(eq("partition"), eq("id"), anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), anyInt(), anyMap()); + anyInt(), any(), anyString(), anyString(), anyInt(), anyMap(), + any(ExpirationPolicy.class)); final ArgumentCaptor workerActionCaptor = ArgumentCaptor.forClass(WorkerAction.class); @@ -306,7 +320,8 @@ public void testJobCreationWithTaskData_Object() throws Exception verify(mockDatabaseHelper, times(1)).createJob( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), anyInt(), anyMap()); + anyInt(), any(), anyString(), anyString(), anyInt(), anyMap(), + any(ExpirationPolicy.class)); } @Test @@ -334,14 +349,16 @@ public void testJobCreationWithPrerequisites() throws Exception verify(mockDatabaseHelper, times(1)).createJobWithDependencies( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false)); + anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false), + any(ExpirationPolicy.class)); } @Test public void testJobCreationWithPrerequisites_MatchingJobRow() throws Exception { when(mockDatabaseHelper.createJobWithDependencies( anyString(), anyString(), anyString(), anyString(), anyString(), anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false) + anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false), + any(ExpirationPolicy.class) )).thenReturn(false); final NewJob job = makeJob(); @@ -354,7 +371,8 @@ public void testJobCreationWithPrerequisites_MatchingJobRow() throws Exception { assertEquals("update", result); verify(mockDatabaseHelper, times(1)).createJobWithDependencies( anyString(), anyString(),anyString(),anyString(),anyString(),anyInt(), anyString(), - anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false)); + anyInt(), any(), anyString(), anyString(), any(), anyInt(), anyMap(), Matchers.eq(false), + any(ExpirationPolicy.class)); } diff --git a/job-service/src/test/java/com/hpe/caf/services/job/utilities/DateHelperTest.java b/job-service/src/test/java/com/hpe/caf/services/job/utilities/DateHelperTest.java new file mode 100644 index 000000000..cf9555a45 --- /dev/null +++ b/job-service/src/test/java/com/hpe/caf/services/job/utilities/DateHelperTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2016-2021 Micro Focus or one of its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.hpe.caf.services.job.utilities; + +import org.junit.Test; +import static org.junit.jupiter.api.Assertions.assertAll; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static com.hpe.caf.services.job.utilities.DateHelper.validate; +import static org.testng.Assert.assertThrows; + +import junit.framework.TestCase; + +public class DateHelperTest extends TestCase +{ + + @Test + public void testInvalidDate(){ + assertAll( + ()-> assertThrows(Exception.class, ()-> validate("202dsds20:50.52Z")), + ()-> assertThrows(Exception.class, ()-> validate("lastUpdateDate+P1D")), + ()-> assertThrows(Exception.class, ()-> validate("ploc")), + ()-> assertThrows(Exception.class, ()-> validate("lastUpdateTime+P15W")) + ); + + } + + @Test + public void testValidDate() + { + assertAll( + () -> assertDoesNotThrow(() -> validate("2021-04-12T23:20:50.52Z")), + () -> assertDoesNotThrow(() -> validate("lastUpdateTime+P1D")), + () -> assertDoesNotThrow(() -> validate("lastUpdateTime+PT21H")), + () -> assertDoesNotThrow(() -> validate("createTime+P90M")), + () -> assertDoesNotThrow(() -> validate("createTime+PT90M")), + () -> assertDoesNotThrow(() -> validate("lastUpdateTime+PT120S")), + () -> assertDoesNotThrow(() -> validate("none")) + ); + } +} diff --git a/release-notes-6.0.0.md b/release-notes-6.0.0.md index 897608ec7..d5fa9af1e 100644 --- a/release-notes-6.0.0.md +++ b/release-notes-6.0.0.md @@ -11,6 +11,9 @@ ${version-number} - **US359648 / US397417**: The Job Service can publish `V4` messages. The Job Service can now send `V3` or `V4` format messages ( `V3` is default ). The `JOB_SERVICE_MESSAGE_OUTPUT_FORMAT` environment variable on the `job-service-scheduled-executor` can be used to control it. +- **SCMOD-8730**: Added support for Expiration policy. + We now have an expiration policy for each job, related to its status. + The policy action is applied based on the operation (`Expire`, `Delete`) and the expiration_time provided. The expiration_time can be a specific date, or an offset from `create_date` or `last_update_date`. #### Breaking Changes - **353257**: The following environment variables are no longer supported:`JOB_SERVICE_DATABASE_URL`, `JOB_SERVICE_DATABASE_HOSTNAME`, From 0be0646ca613f78e4793abe117e44ec3d19d4248 Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Wed, 23 Mar 2022 09:52:27 +0000 Subject: [PATCH 06/11] cleanup --- .../services/job/api/JobServiceFilterIT.java | 1 - job-service-postgres-container/pom.xml | 6 +++--- .../scheduled/executor/ScheduledExecutor.java | 8 +++++-- .../executor/ScheduledExecutorConfig.java | 5 +---- .../caf/services/job/api/DatabaseHelper.java | 21 +++++++++++-------- release-notes-6.0.0.md | 2 +- 6 files changed, 23 insertions(+), 20 deletions(-) diff --git a/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java b/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java index 65783d7d2..709d89637 100644 --- a/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java +++ b/job-service-container/src/test/java/com/hpe/caf/services/job/api/JobServiceFilterIT.java @@ -120,7 +120,6 @@ public void testCompundFiltering() throws Exception final List jobs = jobsApi.getJobs( defaultPartitionId, correlationId, null, null, null, null, null, null, "labels.label1==value or labels.label3==value"); - System.out.println("poloko " + jobs.size()); assertTrue(jobs.size() == 2); cleanUpJobs(jobId1, jobId2, jobId3); } diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index 3a7d41df5..e028b5ec2 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -118,7 +118,7 @@ - + @@ -198,7 +198,7 @@ ${js.database.name} -db.user ${js.database.username} -db.pass ${js.database.password} - + DB update finished. 500 diff --git a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java index c470cff07..7c9d811eb 100644 --- a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java +++ b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutor.java @@ -46,8 +46,12 @@ public ScheduledExecutor() { LOG.info("Starting task for applying the job expiration policy ..."); // Execute the dropTablesTask periodically. - scheduler.scheduleWithFixedDelay(new ApplyJobExpirationPolicyTask(), 20, ScheduledExecutorConfig.getApplyExpirationPolicySchedulerPeriod(), - TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay( + new ApplyJobExpirationPolicyTask(), + 20, + ScheduledExecutorConfig.getApplyExpirationPolicySchedulerPeriod(), + TimeUnit.SECONDS + ); LOG.info("Starting task for dropping soft deleted tables ..."); // Execute the dropTablesTask periodically. diff --git a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java index b63b299d9..14deb7a6e 100644 --- a/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java +++ b/job-service-scheduled-executor/src/main/java/com/hpe/caf/services/job/scheduled/executor/ScheduledExecutorConfig.java @@ -26,10 +26,7 @@ public class ScheduledExecutorConfig { public static int getApplyExpirationPolicySchedulerPeriod() { // Default to 60 seconds if CAF_APPLY_EXPIRATION_POLICY_SCHEDULER_PERIOD not specified. final String period = getPropertyOrEnvVar("CAF_APPLY_EXPIRATION_POLICY_SCHEDULER_PERIOD"); - if (null == period || period.isEmpty()) { - return 60; - } - return Integer.parseInt(period); + return (null != period && !period.isEmpty()) ? Integer.parseInt(period) : 60; } public static String getDatabaseHost(){ diff --git a/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java b/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java index 3085bfac2..0ff180957 100644 --- a/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java +++ b/job-service/src/main/java/com/hpe/caf/services/job/api/DatabaseHelper.java @@ -30,7 +30,6 @@ import com.hpe.caf.services.job.exceptions.NotFoundException; import com.hpe.caf.services.job.exceptions.ServiceUnavailableException; import com.hpe.caf.services.job.utilities.ExpirationPolicyHelper; - import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,20 +112,18 @@ public Job[] getJobs(final String partitionId, String jobIdStartsWith, String st } stmt.setArray(9, array); stmt.setString(10, filter); - String jobId = ""; // Execute a query to return a list of all job definitions in the system. LOG.debug("Calling get_jobs() database function..."); - ExpirationPolicy expirationPolicy = null; try (final ResultSet rs = stmt.executeQuery()) { while (rs.next()) { final Job job = new Job(); job.setId(rs.getString("job_id")); - if (!job.getId().equals(jobId)) { - expirationPolicy = new ExpirationPolicy(); - jobId = job.getId(); + if (!job.getId().isEmpty()) { + retrieveJob(job, rs, new ExpirationPolicy()); + } else { + retrieveJob(job, rs, null); } - retrieveJob(job, rs, expirationPolicy); LOG.debug("Job {}", job); //We joined onto the labels table and there may be multiple rows for the same job, so merge their labels jobs.merge(job.getId(), job, (orig, insert) -> { @@ -296,9 +293,15 @@ public boolean createJob(final String partitionId, final String jobId, final Str } } - private Array setExpirationPolicy(final ExpirationPolicy expirationPolicy, final Connection conn, final CallableStatement stmt, final int parameterIndex) throws SQLException { + private Array setExpirationPolicy( + final ExpirationPolicy expirationPolicy, + final Connection conn, + final CallableStatement stmt, + final int parameterIndex + ) throws SQLException + { final Array arrayP; - if (expirationPolicy != null) { + if (null != expirationPolicy) { final List expirationPolicyList = ExpirationPolicyHelper.toPgCompositeList(expirationPolicy); arrayP = conn.createArrayOf(JOB_POLICY_TYPE_NAME, expirationPolicyList.toArray(new String[0])); LOG.debug("expirationPolicyDB: {}", expirationPolicyList); diff --git a/release-notes-6.0.0.md b/release-notes-6.0.0.md index d5fa9af1e..7fb65ec4e 100644 --- a/release-notes-6.0.0.md +++ b/release-notes-6.0.0.md @@ -11,7 +11,7 @@ ${version-number} - **US359648 / US397417**: The Job Service can publish `V4` messages. The Job Service can now send `V3` or `V4` format messages ( `V3` is default ). The `JOB_SERVICE_MESSAGE_OUTPUT_FORMAT` environment variable on the `job-service-scheduled-executor` can be used to control it. -- **SCMOD-8730**: Added support for Expiration policy. +- **US369231**: Added support for Expiration policy. We now have an expiration policy for each job, related to its status. The policy action is applied based on the operation (`Expire`, `Delete`) and the expiration_time provided. The expiration_time can be a specific date, or an offset from `create_date` or `last_update_date`. From 3b84ccbeab367ffd62b153269e37b27a7bbb7cfc Mon Sep 17 00:00:00 2001 From: Xbreizh Date: Wed, 23 Mar 2022 10:08:50 +0000 Subject: [PATCH 07/11] Remove patch --- job-service-postgres-container/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index e028b5ec2..c479d30e6 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -199,7 +199,7 @@ DB update finished. - + 500 From ea78ac4aa01f52ca56e655457f02fa5ab2a125b4 Mon Sep 17 00:00:00 2001 From: Michael Bryson Date: Thu, 10 Nov 2022 11:21:49 +0000 Subject: [PATCH 08/11] Update pom.xml --- job-service-postgres-container/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index c479d30e6..5cb1507f2 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -147,7 +147,7 @@ job-service-postgres ${dockerJobServiceOrg}job-service-postgres${dockerProjectVersion} - ${dockerHubPublic}/cafapi/java-postgres:2 + ${dockerHubPublic}/cafapi/prereleases:java-postgres-4.0.0-US593021-SNAPSHOT true tar From c4d01325036ebc8c7fc1d262206d83db19d78a9e Mon Sep 17 00:00:00 2001 From: Michael Bryson Date: Thu, 10 Nov 2022 11:22:15 +0000 Subject: [PATCH 09/11] Update pom.xml --- job-service-scheduled-executor-container/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job-service-scheduled-executor-container/pom.xml b/job-service-scheduled-executor-container/pom.xml index 971df7c65..2a6b8918a 100644 --- a/job-service-scheduled-executor-container/pom.xml +++ b/job-service-scheduled-executor-container/pom.xml @@ -135,7 +135,7 @@ job-service-scheduled-executor ${dockerJobServiceOrg}job-service-scheduled-executor${dockerProjectVersion} - ${dockerHubPublic}/cafapi/opensuse-jre11:3 + ${dockerHubPublic}/cafapi/opensuse-jre17:1 /maven/worker.sh From 2d3516060b2e14ea78b6ecc59180d497069bbd5c Mon Sep 17 00:00:00 2001 From: Michael Bryson Date: Thu, 10 Nov 2022 11:25:18 +0000 Subject: [PATCH 10/11] Update pom.xml --- worker-jobtracking-container/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker-jobtracking-container/pom.xml b/worker-jobtracking-container/pom.xml index eccc4a192..5d1da718e 100644 --- a/worker-jobtracking-container/pom.xml +++ b/worker-jobtracking-container/pom.xml @@ -382,7 +382,7 @@ jobtracking-worker ${dockerJobServiceOrg}worker-jobtracking${dockerProjectVersion} - ${dockerHubPublic}/cafapi/opensuse-jre11:3 + ${dockerHubPublic}/cafapi/opensuse-jre17:1 ${caf.worker-queue.impl} ${caf.worker-store.impl} From ac3ab389531cc738cf76f27ab9db0d690dad11e8 Mon Sep 17 00:00:00 2001 From: Michael Bryson Date: Thu, 10 Nov 2022 11:28:34 +0000 Subject: [PATCH 11/11] Revert updates to wrong branch --- job-service-postgres-container/pom.xml | 2 +- job-service-scheduled-executor-container/pom.xml | 2 +- worker-jobtracking-container/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/job-service-postgres-container/pom.xml b/job-service-postgres-container/pom.xml index 5cb1507f2..c479d30e6 100644 --- a/job-service-postgres-container/pom.xml +++ b/job-service-postgres-container/pom.xml @@ -147,7 +147,7 @@ job-service-postgres ${dockerJobServiceOrg}job-service-postgres${dockerProjectVersion} - ${dockerHubPublic}/cafapi/prereleases:java-postgres-4.0.0-US593021-SNAPSHOT + ${dockerHubPublic}/cafapi/java-postgres:2 true tar diff --git a/job-service-scheduled-executor-container/pom.xml b/job-service-scheduled-executor-container/pom.xml index 2a6b8918a..971df7c65 100644 --- a/job-service-scheduled-executor-container/pom.xml +++ b/job-service-scheduled-executor-container/pom.xml @@ -135,7 +135,7 @@ job-service-scheduled-executor ${dockerJobServiceOrg}job-service-scheduled-executor${dockerProjectVersion} - ${dockerHubPublic}/cafapi/opensuse-jre17:1 + ${dockerHubPublic}/cafapi/opensuse-jre11:3 /maven/worker.sh diff --git a/worker-jobtracking-container/pom.xml b/worker-jobtracking-container/pom.xml index 5d1da718e..eccc4a192 100644 --- a/worker-jobtracking-container/pom.xml +++ b/worker-jobtracking-container/pom.xml @@ -382,7 +382,7 @@ jobtracking-worker ${dockerJobServiceOrg}worker-jobtracking${dockerProjectVersion} - ${dockerHubPublic}/cafapi/opensuse-jre17:1 + ${dockerHubPublic}/cafapi/opensuse-jre11:3 ${caf.worker-queue.impl} ${caf.worker-store.impl}