Skip to content

Commit

Permalink
Merge pull request #216 from HSF/flin
Browse files Browse the repository at this point in the history
Remove python 2 leftovers
  • Loading branch information
mightqxc authored Mar 11, 2024
2 parents 5161477 + 00db30c commit 2de7736
Show file tree
Hide file tree
Showing 118 changed files with 484 additions and 1,370 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "04-03-2024 13:34:47 on flin (by mightqxc)"
timestamp = "07-03-2024 11:35:36 on flin (by mightqxc)"
3 changes: 2 additions & 1 deletion pandaharvester/harvesterbody/cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import requests
import requests.exceptions

from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
Expand Down Expand Up @@ -40,7 +41,7 @@ def execute(self, force_update=False, skip_lock=False, n_thread=0):
locked = self.dbProxy.get_process_lock("cacher", self.get_pid(), harvester_config.cacher.sleepTime)
if locked or skip_lock:
mainLog.debug("getting information")
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(minutes=harvester_config.cacher.refreshInterval)
timeLimit = core_utils.naive_utcnow() - datetime.timedelta(minutes=harvester_config.cacher.refreshInterval)
itemsList = []
nItems = 4
for tmpStr in harvester_config.cacher.data:
Expand Down
11 changes: 5 additions & 6 deletions pandaharvester/harvesterbody/command_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
import socket

from future.utils import iteritems
from pandaharvester import commit_timestamp, panda_pkg_info
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterconfig import harvester_config
Expand Down Expand Up @@ -36,7 +35,7 @@ def convert_to_command_specs(self, commands):
for command in commands:
command_spec = CommandSpec()
command_spec.convert_command_json(command)
for comStr, receiver in iteritems(CommandSpec.receiver_map):
for comStr, receiver in CommandSpec.receiver_map.items():
if command_spec.command.startswith(comStr):
command_spec.receiver = receiver
break
Expand All @@ -55,7 +54,7 @@ def run(self):
# send command list to be received
siteNames = set()
commandList = []
for queueName, queueConfig in iteritems(self.queueConfigMapper.get_active_queues()):
for queueName, queueConfig in self.queueConfigMapper.get_active_queues().items():
if queueConfig is None or queueConfig.runMode != "slave":
continue
# one command for all queues in one site
Expand All @@ -70,7 +69,7 @@ def run(self):
# one command for each queue
commandItem = {"command": CommandSpec.COM_setNWorkers, "computingSite": queueConfig.siteName, "resourceType": queueConfig.resourceType}
commandList.append(commandItem)
data = {"startTime": datetime.datetime.utcnow(), "sw_version": panda_pkg_info.release_version, "commit_stamp": commit_timestamp.timestamp}
data = {"startTime": core_utils.naive_utcnow(), "sw_version": panda_pkg_info.release_version, "commit_stamp": commit_timestamp.timestamp}
if len(commandList) > 0:
main_log.debug("sending command list to receive")
data["commands"] = commandList
Expand All @@ -84,8 +83,8 @@ def run(self):
main_log.debug("polling commands loop")

# send heartbeat
if self.lastHeartbeat is None or self.lastHeartbeat < datetime.datetime.utcnow() - datetime.timedelta(minutes=10):
self.lastHeartbeat = datetime.datetime.utcnow()
if self.lastHeartbeat is None or self.lastHeartbeat < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
self.lastHeartbeat = core_utils.naive_utcnow()
self.communicator.is_alive({})

continuous_loop = True # as long as there are commands, retrieve them
Expand Down
5 changes: 2 additions & 3 deletions pandaharvester/harvesterbody/event_feeder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from future.utils import iteritems
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
Expand Down Expand Up @@ -31,7 +30,7 @@ def run(self):
)
mainLog.debug(f"got {len(workSpecsPerQueue)} queues")
# loop over all workers
for queueName, workSpecList in iteritems(workSpecsPerQueue):
for queueName, workSpecList in workSpecsPerQueue.items():
tmpQueLog = self.make_logger(_logger, f"queue={queueName}", method_name="run")
# check queue
if not self.queueConfigMapper.has_queue(queueName):
Expand Down Expand Up @@ -71,7 +70,7 @@ def run(self):
tmpLog.error("failed to feed events")
continue
# dump
for pandaID, eventList in iteritems(events):
for pandaID, eventList in events.items():
try:
nRanges = workSpec.eventsRequestParams[pandaID]["nRanges"]
except Exception:
Expand Down
7 changes: 3 additions & 4 deletions pandaharvester/harvesterbody/job_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import random
import socket

from future.utils import iteritems
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
Expand Down Expand Up @@ -40,7 +39,7 @@ def run(self):
pandaQueueDict = PandaQueuesDict()

# loop over all queues
for queueName, nJobs in iteritems(nJobsPerQueue):
for queueName, nJobs in nJobsPerQueue.items():
# check queue
if not self.queueConfigMapper.has_queue(queueName):
continue
Expand Down Expand Up @@ -78,7 +77,7 @@ def run(self):
fileStatMap = dict()
sw_startconvert = core_utils.get_stopwatch()
for job in jobs:
timeNow = datetime.datetime.utcnow()
timeNow = core_utils.naive_utcnow()
jobSpec = JobSpec()
jobSpec.convert_job_json(job)
jobSpec.computingSite = queueName
Expand All @@ -94,7 +93,7 @@ def run(self):
if extractorCore is not None:
fileGroupDictList.append(extractorCore.get_aux_inputs(jobSpec))
for fileGroupDict in fileGroupDictList:
for tmpLFN, fileAttrs in iteritems(fileGroupDict):
for tmpLFN, fileAttrs in fileGroupDict.items():
# make file spec
fileSpec = FileSpec()
fileSpec.PandaID = jobSpec.PandaID
Expand Down
6 changes: 3 additions & 3 deletions pandaharvester/harvesterbody/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
import time

import daemon.pidfile
from future.utils import iteritems

try:
import pprofile
except Exception:
pass

from pandalogger import logger_config

from pandaharvester import commit_timestamp, panda_pkg_info
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
from pandaharvester.harvestermisc.apfmon import Apfmon
from pandalogger import logger_config

# logger
_logger = core_utils.setup_logger("master")
Expand Down Expand Up @@ -299,7 +299,7 @@ def main(daemon_mode=True):
_logger.addHandler(stdoutHandler)
# collect streams not to be closed by daemon
files_preserve = []
for loggerName, loggerObj in iteritems(logging.Logger.manager.loggerDict):
for loggerName, loggerObj in logging.Logger.manager.loggerDict.items():
if loggerName.startswith("panda"):
for handler in loggerObj.handlers:
if hasattr(handler, "stream"):
Expand Down
23 changes: 11 additions & 12 deletions pandaharvester/harvesterbody/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import random
import time

from future.utils import iteritems
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterconfig import harvester_config
from pandaharvester.harvestercore import core_utils
Expand Down Expand Up @@ -98,8 +97,8 @@ def run(self):
)
mainLog.debug(f"got {len(workSpecsPerQueue)} queues")
# loop over all workers
for queueName, configIdWorkSpecs in iteritems(workSpecsPerQueue):
for configID, workSpecsList in iteritems(configIdWorkSpecs):
for queueName, configIdWorkSpecs in workSpecsPerQueue.items():
for configID, workSpecsList in configIdWorkSpecs.items():
try:
retVal = self.monitor_agent_core(lockedBy, queueName, workSpecsList, config_id=configID, check_source="DB")
except Exception as e:
Expand Down Expand Up @@ -162,7 +161,7 @@ def run(self):
# digest events of worker update
to_run_fifo_check = False
retMap = self.monitor_event_digester(locked_by=lockedBy, max_events=eventBasedCheckMaxEvents)
for qc_key, retVal in iteritems(retMap):
for qc_key, retVal in retMap.items():
workSpecsToEnqueue, workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = retVal
# only enqueue postprocessing workers to FIFO
obj_to_enqueue_to_head_dict[qc_key][0].extend(workSpecsToEnqueueToHead)
Expand Down Expand Up @@ -280,7 +279,7 @@ def run(self):
n_chunk_put = 0
mainLog.debug("putting worker chunks to FIFO")
for _dct in (obj_to_enqueue_dict, remaining_obj_to_enqueue_dict):
for (queueName, configID), obj_to_enqueue in iteritems(_dct):
for (queueName, configID), obj_to_enqueue in _dct.items():
try:
workSpecsToEnqueue, timeNow_timestamp, fifoCheckInterval = obj_to_enqueue
if workSpecsToEnqueue:
Expand All @@ -292,7 +291,7 @@ def run(self):
mainLog.error(f"failed to put object from FIFO: {errStr}")
mainLog.debug("putting worker chunks to FIFO head")
for _dct in (obj_to_enqueue_to_head_dict, remaining_obj_to_enqueue_to_head_dict):
for (queueName, configID), obj_to_enqueue_to_head in iteritems(_dct):
for (queueName, configID), obj_to_enqueue_to_head in _dct.items():
try:
workSpecsToEnqueueToHead, timeNow_timestamp, fifoCheckInterval = obj_to_enqueue_to_head
if workSpecsToEnqueueToHead:
Expand Down Expand Up @@ -401,7 +400,7 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False
workSpec.set_work_attributes(workAttributes)
workSpec.set_dialog_message(diagMessage)
if isChecked:
workSpec.checkTime = datetime.datetime.utcnow()
workSpec.checkTime = core_utils.naive_utcnow()
isCheckedList.append(isChecked)
if monStatus == WorkSpec.ST_failed:
if not workSpec.has_pilot_error() and workSpec.errorCode is None:
Expand Down Expand Up @@ -494,7 +493,7 @@ def monitor_agent_core(self, lockedBy, queueName, workSpecsList, from_fifo=False
and workSpec.mapType != WorkSpec.MT_MultiWorkers
and workSpec.workAttributes is not None
):
timeNow = datetime.datetime.utcnow()
timeNow = core_utils.naive_utcnow()
timeNow_timestamp = time.time()
# get lastCheckAt
_bool, lastCheckAt = workSpec.get_work_params("lastCheckAt")
Expand Down Expand Up @@ -645,7 +644,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
else:
tmp_log.debug("Nothing to be checked with plugin")
tmpOut = []
timeNow = datetime.datetime.utcnow()
timeNow = core_utils.naive_utcnow()
for workSpec, (newStatus, diagMessage) in itertools.chain(zip(workersToCheck, tmpOut), thingsToPostProcess):
workerID = workSpec.workerID
tmp_log.debug(f"Going to check workerID={workerID}")
Expand Down Expand Up @@ -737,7 +736,7 @@ def check_workers(self, mon_core, messenger, all_workers, queue_config, tmp_log,
# retry
ppTimeOut = getattr(harvester_config.monitor, "postProcessTimeout", 0)
if ppTimeOut > 0:
timeLimit = datetime.datetime.utcnow() - datetime.timedelta(minutes=ppTimeOut)
timeLimit = core_utils.naive_utcnow() - datetime.timedelta(minutes=ppTimeOut)
if workSpec.endTime is None or workSpec.endTime > timeLimit:
isOK = False
# set end time just in case for timeout
Expand Down Expand Up @@ -794,8 +793,8 @@ def monitor_event_digester(self, locked_by, max_events):
if len(workerID_list) > 0:
updated_workers_dict = self.dbProxy.get_workers_from_ids(workerID_list)
tmpLog.debug("got workspecs for worker events")
for queueName, _val in iteritems(updated_workers_dict):
for configID, workSpecsList in iteritems(_val):
for queueName, _val in updated_workers_dict.items():
for configID, workSpecsList in _val.items():
qc_key = (queueName, configID)
tmpLog.debug("checking workers of queueName={0} configID={1}".format(*qc_key))
try:
Expand Down
4 changes: 2 additions & 2 deletions pandaharvester/harvesterbody/preparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def run(self):
jobSpec.subStatus = "failed_to_prepare"
jobSpec.lockedBy = None
jobSpec.preparatorTime = None
jobSpec.stateChangeTime = datetime.datetime.utcnow()
jobSpec.stateChangeTime = core_utils.naive_utcnow()
errStr = f"stage-in failed with {tmpStr}"
jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
jobSpec.trigger_propagation()
Expand Down Expand Up @@ -287,7 +287,7 @@ def run(self):
jobSpec.subStatus = "failed_to_prepare"
jobSpec.lockedBy = None
jobSpec.preparatorTime = None
jobSpec.stateChangeTime = datetime.datetime.utcnow()
jobSpec.stateChangeTime = core_utils.naive_utcnow()
errStr = f"stage-in failed with {tmpStr}"
jobSpec.set_pilot_error(PilotErrors.STAGEINFAILED, errStr)
jobSpec.trigger_propagation()
Expand Down
10 changes: 5 additions & 5 deletions pandaharvester/harvesterbody/propagator.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def run(self):
# unset to disable further updating
tmpJobSpec.propagatorTime = None
tmpJobSpec.subStatus = "done"
tmpJobSpec.modificationTime = datetime.datetime.utcnow()
tmpJobSpec.modificationTime = core_utils.naive_utcnow()
elif tmpJobSpec.is_final_status() and not tmpJobSpec.all_events_done():
# trigger next propagation to update remaining events
tmpJobSpec.trigger_propagation()
Expand All @@ -108,7 +108,7 @@ def run(self):
tmpJobSpec.status = "cancelled"
tmpJobSpec.subStatus = "killed"
tmpJobSpec.set_pilot_error(PilotErrors.PANDAKILL, PilotErrors.pilot_error_msg[PilotErrors.PANDAKILL])
tmpJobSpec.stateChangeTime = datetime.datetime.utcnow()
tmpJobSpec.stateChangeTime = core_utils.naive_utcnow()
tmpJobSpec.trigger_propagation()
self.dbProxy.update_job(tmpJobSpec, {"propagatorLock": self.get_pid()}, update_out_file=True)
else:
Expand Down Expand Up @@ -181,18 +181,18 @@ def run(self):
else:
mainLog.error(f"failed to update worker stats (bulk) for {site_name} err={tmp_str}")

if not self._last_metrics_update or datetime.datetime.utcnow() - self._last_metrics_update > datetime.timedelta(seconds=METRICS_PERIOD):
if not self._last_metrics_update or core_utils.naive_utcnow() - self._last_metrics_update > datetime.timedelta(seconds=METRICS_PERIOD):
# get latest metrics from DB
service_metrics_list = self.dbProxy.get_service_metrics(self._last_metrics_update)
if not service_metrics_list:
if self._last_metrics_update:
mainLog.error("failed to get service metrics")
self._last_metrics_update = datetime.datetime.utcnow()
self._last_metrics_update = core_utils.naive_utcnow()
else:
tmp_ret, tmp_str = self.communicator.update_service_metrics(service_metrics_list)
if tmp_ret:
mainLog.debug("update of service metrics OK")
self._last_metrics_update = datetime.datetime.utcnow()
self._last_metrics_update = core_utils.naive_utcnow()
else:
mainLog.error(f"failed to update service metrics err={tmp_str}")

Expand Down
6 changes: 1 addition & 5 deletions pandaharvester/harvesterbody/service_monitor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import multiprocessing
import re
import subprocess
import traceback

import psutil

try:
import subprocess32 as subprocess
except Exception:
import subprocess

from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterbody.cred_manager import CredManager
from pandaharvester.harvesterconfig import harvester_config
Expand Down
9 changes: 4 additions & 5 deletions pandaharvester/harvesterbody/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import socket
import time

from future.utils import iteritems
from pandaharvester.harvesterbody.agent_base import AgentBase
from pandaharvester.harvesterbody.worker_adjuster import WorkerAdjuster
from pandaharvester.harvesterbody.worker_maker import WorkerMaker
Expand Down Expand Up @@ -62,9 +61,9 @@ def run(self):
main_log.debug(f"got {len(command_specs)} {com_str} commands")
for command_spec in command_specs:
new_limits = self.dbProxy.set_queue_limit(site_name, command_spec.params)
for tmp_job_type, tmp_jt_vals in iteritems(new_limits):
for tmp_job_type, tmp_jt_vals in new_limits.items():
res_map.setdefault(tmp_job_type, {})
for tmp_resource_type, tmp_new_val in iteritems(tmp_jt_vals):
for tmp_resource_type, tmp_new_val in tmp_jt_vals.items():
# if available, overwrite new worker value with the command from panda server
if tmp_resource_type in res_map[tmp_job_type]:
tmp_queue_name = res_map[tmp_job_type][tmp_resource_type]
Expand Down Expand Up @@ -210,7 +209,7 @@ def run(self):
tmp_log.debug(f"successfully made {len(okChunks)} workers")
else:
tmp_log.debug(f"made {len(okChunks)} workers, while {len(ngChunks)} workers failed")
timeNow = datetime.datetime.utcnow()
timeNow = core_utils.naive_utcnow()
timeNow_timestamp = time.time()
pandaIDs = set()
# NG (=not good)
Expand Down Expand Up @@ -412,7 +411,7 @@ def run(self):
if submitted and hasattr(harvester_config.submitter, "minSubmissionInterval"):
interval = harvester_config.submitter.minSubmissionInterval
if interval > 0:
newTime = datetime.datetime.utcnow() + datetime.timedelta(seconds=interval)
newTime = core_utils.naive_utcnow() + datetime.timedelta(seconds=interval)
self.dbProxy.update_panda_queue_attribute("submitTime", newTime, site_name=site_name)

# time the cycle
Expand Down
Loading

0 comments on commit 2de7736

Please sign in to comment.