Skip to content

Commit

Permalink
Merge branch 'main' into nhaimerl-add-telemetry-for-missing-payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
NikolasHaimerl authored Feb 6, 2025
2 parents a53e420 + 85b7cb3 commit 56e7786
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 20 deletions.
15 changes: 3 additions & 12 deletions backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import timers from 'node:timers/promises'
import slug from 'slug'
import '../lib/instrument.js'
import { createInflux } from '../lib/telemetry.js'
import { getChainHead, rpcRequest } from '../lib/rpc-service/service.js'
import { rpcRequest } from '../lib/rpc-service/service.js'
import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { countStoredActiveDealsWithUnresolvedPayloadCid, lookUpPayloadCids } from '../lib/look-up-payload-cids.js'
import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js'
import { getDealPayloadCid } from '../lib/piece-indexer-service.js'
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */

const {
INFLUXDB_TOKEN,
Expand Down Expand Up @@ -42,17 +43,7 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {
while (true) {
const start = Date.now()
try {
const currentChainHead = await getChainHead(makeRpcRequest)
const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const startEpoch = Math.max(
currentChainHead.Height - maxPastEpochs,
(lastInsertedDeal?.activated_at_epoch + 1) || 0
)
const endEpoch = currentChainHead.Height - finalityEpochs

for (let epoch = startEpoch; epoch <= endEpoch; epoch++) {
await observeBuiltinActorEvents(epoch, pgPool, makeRpcRequest)
}
await observeBuiltinActorEvents(pgPool, makeRpcRequest, maxPastEpochs, finalityEpochs)
const newLastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const numberOfStoredDeals = await countStoredActiveDeals(pgPool)
if (INFLUXDB_TOKEN) {
Expand Down
29 changes: 24 additions & 5 deletions backend/lib/deal-observer.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,40 @@
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
/** @import { Static } from '@sinclair/typebox' */

import { getActorEvents, getActorEventsFilter } from './rpc-service/service.js'
import { getActorEvents, getActorEventsFilter, getChainHead } from './rpc-service/service.js'
import { ActiveDealDbEntry } from '@filecoin-station/deal-observer-db/lib/types.js'
import { Value } from '@sinclair/typebox/value'
import { convertBlockEventToActiveDealDbEntry } from './utils.js'

/**
* @param {number} blockHeight
* @param {Queryable} pgPool
* @param {(method:string,params:object) => object} makeRpcRequest
* @param {(method:string,params:any[]) => Promise<any>} makeRpcRequest
* @param {number} maxPastEpochs
* @param {number} finalityEpochs
* @returns {Promise<void>}
*/
export async function observeBuiltinActorEvents (blockHeight, pgPool, makeRpcRequest) {
export const observeBuiltinActorEvents = async (pgPool, makeRpcRequest, maxPastEpochs, finalityEpochs) => {
const currentChainHead = await getChainHead(makeRpcRequest)
const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const startEpoch = Math.max(
currentChainHead.Height - maxPastEpochs,
(lastInsertedDeal?.activated_at_epoch + 1) || 0
)
const endEpoch = currentChainHead.Height - finalityEpochs
for (let epoch = startEpoch; epoch <= endEpoch; epoch++) {
await fetchAndStoreActiveDeals(epoch, pgPool, makeRpcRequest)
}
}

/**
* @param {number} blockHeight
* @param {Queryable} pgPool
* @param {(method:string,params:any[]) => Promise<any>} makeRpcRequest
*/
export const fetchAndStoreActiveDeals = async (blockHeight, pgPool, makeRpcRequest) => {
const eventType = 'claim'
const blockEvents = await getActorEvents(getActorEventsFilter(blockHeight, eventType), makeRpcRequest)
console.log(`Observed ${blockEvents.length} ${eventType} events in block ${blockHeight}`)
console.log(`Fetched ${blockEvents.length} ${eventType} events from block ${blockHeight}`)
await storeActiveDeals(blockEvents.map((event) => convertBlockEventToActiveDealDbEntry(event)), pgPool)
}

Expand Down
61 changes: 60 additions & 1 deletion backend/test/deal-observer.test.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import assert from 'node:assert'
import { after, before, beforeEach, describe, it } from 'node:test'
import { createPgPool, migrateWithPgClient } from '@filecoin-station/deal-observer-db'
import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, storeActiveDeals } from '../lib/deal-observer.js'
import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, loadDeals, storeActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { Value } from '@sinclair/typebox/value'
import { BlockEvent } from '../lib/rpc-service/data-types.js'
import { convertBlockEventToActiveDealDbEntry } from '../lib/utils.js'
import { chainHeadTestData } from './test_data/chainHead.js'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { parse } from '@ipld/dag-json'

describe('deal-observer-backend', () => {
let pgPool
Expand Down Expand Up @@ -100,3 +103,59 @@ describe('deal-observer-backend', () => {
assert.strictEqual(await countStoredActiveDeals(pgPool), 2n)
})
})

describe('deal-observer-backend built in actor event observer', () => {
let pgPool
const makeRpcRequest = async (method, params) => {
switch (method) {
case 'Filecoin.ChainHead':
return parse(JSON.stringify(chainHeadTestData))
case 'Filecoin.GetActorEventsRaw':
return parse(JSON.stringify(rawActorEventTestData)).filter(e => e.height >= params[0].fromHeight && e.height <= params[0].toHeight)
default:
console.error('Unknown method')
}
}
before(async () => {
pgPool = await createPgPool()
await migrateWithPgClient(pgPool)
})

after(async () => {
await pgPool.end()
})

beforeEach(async () => {
await pgPool.query('DELETE FROM active_deals')
})
it('stores all retrievable active deals if database is empty', async () => {
await observeBuiltinActorEvents(pgPool, makeRpcRequest, 10, 0)
const deals = await loadDeals(pgPool, 'SELECT * FROM active_deals')
assert.strictEqual(deals.length, 360)
})

it('correctly picks up from where the current storage is at', async () => {
await observeBuiltinActorEvents(pgPool, makeRpcRequest, 11, 10)
let deals = await loadDeals(pgPool, 'SELECT * FROM active_deals')
assert.strictEqual(deals.length, 25)
const lastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
assert.strictEqual(lastInsertedDeal.activated_at_epoch, 4622129)

// The deal observer function should pick up from the current storage
await observeBuiltinActorEvents(pgPool, makeRpcRequest, 100, 0)
deals = await loadDeals(pgPool, 'SELECT * FROM active_deals')
assert.strictEqual(deals.length, 360)
})

it('correctly applies the max past epoch and finality epoch parameters', async () => {
await observeBuiltinActorEvents(pgPool, makeRpcRequest, 11, 12)
let deals = await loadDeals(pgPool, 'SELECT * FROM active_deals')
// No deals should be stored because the finality epoch is larger than the maximum past epoch parameter
assert.strictEqual(deals.length, 0)

await observeBuiltinActorEvents(pgPool, makeRpcRequest, 11, 10)
deals = await loadDeals(pgPool, 'SELECT * FROM active_deals')
// There should be only one distinct block height in the database
assert.strictEqual((new Set(deals.map(deal => deal.activated_at_epoch))).size, 1)
})
})
4 changes: 2 additions & 2 deletions backend/test/look-up-payload-cids.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { before, beforeEach, it, describe, after } from 'node:test'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { chainHeadTestData } from './test_data/chainHead.js'
import { parse } from '@ipld/dag-json'
import { observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { fetchAndStoreActiveDeals } from '../lib/deal-observer.js'
import assert from 'assert'
import { minerPeerIds } from './test_data/minerInfo.js'
import { payloadCIDs } from './test_data/payloadCIDs.js'
Expand Down Expand Up @@ -36,7 +36,7 @@ describe('deal-observer-backend look up payload CIDs', () => {
await pgPool.query('DELETE FROM active_deals')
const startEpoch = 4622129
for (let blockHeight = startEpoch; blockHeight < startEpoch + 10; blockHeight++) {
await observeBuiltinActorEvents(blockHeight, pgPool, makeRpcRequest)
await fetchAndStoreActiveDeals(blockHeight, pgPool, makeRpcRequest)
}
assert.strictEqual(
(await pgPool.query('SELECT * FROM active_deals')).rows.length,
Expand Down

0 comments on commit 56e7786

Please sign in to comment.