Skip to content

Commit

Permalink
fix: rewrite of events system and associated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
apowers313 committed Mar 24, 2021
1 parent f7594d9 commit 6075d97
Show file tree
Hide file tree
Showing 24 changed files with 843 additions and 385 deletions.
11 changes: 11 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
module.exports = {
root: true,
parser: "@typescript-eslint/parser",
parserOptions: {
"project": "./tsconfig.json"
},
plugins: [
"@typescript-eslint",
],
extends: [
"plugin:old-c-programmer/node",
],
rules: {
"@typescript-eslint/no-floating-promises": "error"
}
};
7 changes: 4 additions & 3 deletions lib/Breakpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const {Config} = require("./Config");
const {Trace} = require("./Trace");
const {EventFilter} = require("./EventFilter");
const {checkType} = require("./Utility");
const {Synchronize} = require("./Synchronize");

let doBreak;
let runFn = undefined;
Expand Down Expand Up @@ -205,7 +204,7 @@ class Breakpoint extends EventFilter {
// if we hit a breakpoint
if (doBreak) {
if (bp) {
console.info("Stopping execution for breakpoint.", bp.toString());
console.info("Stopping execution for breakpoint:", bp.toString());
} else {
console.info("Breakpoint encountered.");
}
Expand All @@ -215,6 +214,7 @@ class Breakpoint extends EventFilter {
checkType("checkBreak", "runFn", runFn, "undefined");
}

const {Synchronize} = require("./Synchronize");
Synchronize.pauseWatchdog();

return new Promise((resolve) => {
Expand All @@ -241,6 +241,7 @@ class Breakpoint extends EventFilter {
checkType("Breakpoint.run", "runFn", runFn, "function");
setImmediate(runFn);

const {Synchronize} = require("./Synchronize");
Synchronize.startWatchdog();

runFn = undefined;
Expand Down Expand Up @@ -306,7 +307,7 @@ class Breakpoint extends EventFilter {

/** true if a break has been triggered */
static get inBreak() {
return !!runFn || doBreak;
return !!runFn || !!doBreak;
}

/** Array of breakpoint strings, as returned by `toString` */
Expand Down
9 changes: 7 additions & 2 deletions lib/EventBase.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// TODO: use NodeEventTarget instead of EventEmitter; still experimental in nodejs v14.5
const EventEmitter = require("events");
const EventEmitter = require("promise-events");
const {Breakpoint} = require("./Breakpoint");
const {EventFilter} = require("./EventFilter");
const {checkType, checkInstance} = require("./Utility");
Expand Down Expand Up @@ -128,6 +128,9 @@ class EventBusBase extends EventEmitter {
*/
checkEvent(event) {
checkInstance("EventBusBase.checkEvent", "event", event, this._baseEvent);
if (!event.type) {
console.warn("Emitting event without a type:", event);
}
}

// eslint-disable-next-line jsdoc/require-jsdoc
Expand All @@ -147,9 +150,11 @@ class EventBusBase extends EventEmitter {
async emit(eventName, event, ... args) {
this.checkEvent(event);

return Breakpoint.checkBreak(event, () => {
let ret = await Breakpoint.checkBreak(event, async() => {
return super.emit(eventName, event, ... args);
});

return !!ret;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions lib/FeatureExtractor.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class FeatureExtractor extends Component {
// TODO: performance profiling of event handling can go here
let res = this.cb.call(this, inputEvent.data, inputEvent);
if (res !== undefined && res !== null) {
setImmediate(() => {
this.sendEvent("data", res);
});
// setImmediate(() => {
this.sendEvent("data", res);
// });
}
});
}
Expand Down
13 changes: 10 additions & 3 deletions lib/Intrinsic.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class Intrinsic extends Component {
}

/**
* the value of the intrinsic
* Set the value of the intrinsic and emit a `change` event if the value changed.
*
* @param val
* @param {*} val The value to set for the intrinsic, which is specific to the implementation of the intrinsic.
* @returns {Promise} Returns a promise that resolves to the value of `sendEvent` if an event was emitted, or null if no change occurred.
*/
async setValue(val) {
val = this._converter(val);
Expand Down Expand Up @@ -75,15 +76,21 @@ class Intrinsic extends Component {
return null;
}

// eslint-disable-next-line jsdoc/require-jsdoc
/**
* Returns the value of the intrinsic
*
* @returns {*} The value of the intrinsic
*/
getValue() {
return this._value;
}

// eslint-disable-next-line jsdoc/require-jsdoc
set value(v) {
throw new Error("The value setter has been deprecated");
}

// eslint-disable-next-line jsdoc/require-jsdoc
get value() {
throw new Error("The value getter has been deprecated");
}
Expand Down
31 changes: 21 additions & 10 deletions lib/Significance.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class SignificanceEvent extends EventBase {
constructor(sourceName, sourceType) {
super();

checkType("PerceptionEvent.constructor", "sourceName", sourceName, "string");
checkType("PerceptionEvent.constructor", "sourceType", sourceType, "string");
checkType("SignificanceEvent.constructor", "sourceName", sourceName, "string");
checkType("SignificanceEvent.constructor", "sourceType", sourceType, "string");
createHiddenProp(this, "_sourceName", sourceName, true);
createHiddenProp(this, "_sourceType", sourceType, true);
}
Expand Down Expand Up @@ -63,10 +63,7 @@ class Significance extends Component {
createHiddenProp(this, "_changeList", new Set());

let filter = new EventFilter("allow", {eventType: "change", all: true});
// let filter = new EventFilter("allow", {sourceType: "intrinsic", eventType: "change", all: true});
new EventListener(Significance.eventBus, filter, this.getChange.bind(this));

Synchronize.register(this.onTick.bind(this));
}

/**
Expand All @@ -81,7 +78,7 @@ class Significance extends Component {
/**
* Every tick, collect Intrinsic changes, roll them up, and emit them as a significance event
*/
onTick() {
async onTick() {
// collect changes
let changes = [... this._changeList.values()].map((v) => {
return {type: v.intrinsic.name, val: v.newNormVal};
Expand All @@ -93,12 +90,14 @@ class Significance extends Component {
// calculate significance
let significance = changes.reduce((sig, c) => sig + c.weightedVal, 0);

// remove old changes
this._changeList.clear();

// emit event
this.sendEvent("significance", {
return this.sendEvent("significance", {
significance,
changes,
});
this._changeList.clear();
}

/** the perception event bus, for communicating between perception {@link Component|Components} */
Expand Down Expand Up @@ -139,9 +138,21 @@ class Significance extends Component {
weightingMap.clear();
}

static init() {
/**
* Initializes the Significance singleton
*/
static async init() {
// TODO: need to clean this up during shutdown
return new Significance();
let s = new Significance();
await Synchronize.register(s.onTick.bind(s));
return s;
}

/**
* Shuts down the Significance singleton
*/
static async shutdown() {

}
}

Expand Down
87 changes: 74 additions & 13 deletions lib/Synchronize.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,52 @@
const {Config} = require("./Config");
const Log = require("./Log");
const EventEmitter = require("events");
const {checkType, createHiddenProp} = require("./Utility");
const {EventBase, EventBusBase} = require("./EventBase");

let tickCount;
let isSync;
let initialized = false;
let hadTick;
let timerHandle;
let emitter;

class SynchronizeEvent extends EventBase {
/**
* Creates a new event to be sent over the significance bus
*
* @param {string} sourceName - The name of the source of the event.
* @param {string} sourceType - The type of the source.
*/
constructor(sourceName, sourceType) {
super();

checkType("PerceptionEvent.constructor", "sourceName", sourceName, "string");
checkType("PerceptionEvent.constructor", "sourceType", sourceType, "string");
createHiddenProp(this, "_sourceName", sourceName, true);
createHiddenProp(this, "_sourceType", sourceType, true);
}

// eslint-disable-next-line jsdoc/require-jsdoc
get sourceName() {
return this._sourceName || "initializing";
}

// eslint-disable-next-line jsdoc/require-jsdoc
get sourceType() {
return this._sourceType || "initializing";
}

// eslint-disable-next-line jsdoc/require-jsdoc
get allowedEventTypes() {
return new Set(["tick"]);
}

// eslint-disable-next-line jsdoc/require-jsdoc
get eventBus() {
return synchronizeEventBus;
}
}

const synchronizeEventBus = new EventBusBase(SynchronizeEvent);

/**
* A singleton class used to synchronize intrinsics, significance, and perceptions.
Expand All @@ -28,18 +67,16 @@ class Synchronize {
Synchronize.startWatchdog();
} else {
Log.warn("Synchronize detected asynchronous environment: this mode is untested");
timerHandle = setInterval(_nextTick, Config.get("environment-async-time"));
timerHandle = setInterval(_asyncTick, Config.get("environment-async-time"));
}

emitter = new EventEmitter();
}

/**
* Used by synchronus environments to indicate that a synchronous step has been taken.
*
* @throws Error if used with an asynchronous environment (Config "environment-synchronous" === `false`)
*/
static nextTick() {
static async nextTick() {
if (!initialized) {
throw new Error("Please call Synchronize.init() before Synchronize.nextTick()");
}
Expand All @@ -57,14 +94,24 @@ class Synchronize {
* Register a callback that will be triggered by `nextTick`.
* Internally this calls `addListener` on an `EventEmitter`.
*
* @param {Function} cb The callback to be called by `nextTick`
* @param {Promise<Function>} cb The callback to be called by `nextTick`
*/
static register(cb) {
static async register(cb) {
if (!initialized) {
throw new Error("Please call Synchronize.init() before Synchronize.register()");
}

emitter.on("tick", cb);
return synchronizeEventBus.addListener("tick", cb);
}

/**
* Removes a Synchronize listener
*
* @param {Function} cb The callback function that was passed to `register`
* @returns {Promise} A Promise that resolves when the listener has been removed.
*/
static async unregister(cb) {
return synchronizeEventBus.removeListener("tick", cb);
}

/**
Expand All @@ -81,11 +128,11 @@ class Synchronize {
/**
* Terminate the Synchronization sub-system. Mostly used for testing.
*/
static shutdown() {
static async shutdown() {
initialized = false;
tickCount = undefined;
emitter = undefined;
clearInterval(timerHandle);
return synchronizeEventBus.removeAllListeners();
}

/**
Expand All @@ -99,6 +146,9 @@ class Synchronize {
hadTick = false;
}

/**
* Pauses the watchdog. Primarily used in Breakpoint.
*/
static pauseWatchdog() {
if (!initialized) {
return;
Expand All @@ -107,6 +157,9 @@ class Synchronize {
clearInterval(timerHandle);
}

/**
* Restarts the watchdog. Primarily used in Breakpoint.
*/
static startWatchdog() {
if (!initialized) {
return;
Expand All @@ -116,9 +169,17 @@ class Synchronize {
}
}

function _nextTick() {
async function _nextTick() {
tickCount++;
emitter.emit("tick", tickCount);
let e = new SynchronizeEvent("synchronize", "synchronize");
return e.emit("tick", tickCount);
}

async function _asyncTick() {
return _nextTick()
.catch((err) => {
throw err;
});
}

module.exports = {
Expand Down
Loading

0 comments on commit 6075d97

Please sign in to comment.