Skip to content
This repository has been archived by the owner on Aug 3, 2022. It is now read-only.

WIP: Implement "alter table add/drop column" for apid generic replication. #53

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apigee_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
maxIdleConnsPerHost = 10
)

var knownTables = make(map[string]bool)
var knownTables = make(map[string]map[string]bool)

/*
* Start from existing snapshot if possible
Expand Down
17 changes: 9 additions & 8 deletions apigee_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var _ = Describe("Sync", func() {
apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
if s, ok := event.(*common.Snapshot); ok {

Expect(16).To(Equal(len(knownTables)))
Expect(12).To(Equal(len(knownTables)))
Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())

lastSnapshot = s
Expand Down Expand Up @@ -235,38 +235,39 @@ var _ = Describe("Sync", func() {

It("should correctly identify non-proper subsets with respect to maps", func() {

testMap := map[string]map[string]bool{"a": make(map[string]bool), "b": make(map[string]bool)}
//test b proper subset of a
Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
Expect(changesHaveNewTables(testMap,
[]common.Change{common.Change{Table: "b"}},
)).To(BeFalse())

//test a == b
Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
Expect(changesHaveNewTables(testMap,
[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
)).To(BeFalse())

//test b superset of a
Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
Expect(changesHaveNewTables(testMap,
[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
)).To(BeTrue())

//test b not subset of a
Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
Expect(changesHaveNewTables(testMap,
[]common.Change{common.Change{Table: "c"}},
)).To(BeTrue())

//test a empty
Expect(changesHaveNewTables(map[string]bool{},
Expect(changesHaveNewTables(map[string]map[string]bool{},
[]common.Change{common.Change{Table: "a"}},
)).To(BeTrue())

//test b empty
Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
Expect(changesHaveNewTables(testMap,
[]common.Change{},
)).To(BeFalse())

//test b nil
Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse())
Expect(changesHaveNewTables(testMap, nil)).To(BeFalse())

//test a nil
Expect(changesHaveNewTables(nil,
Expand Down
2 changes: 1 addition & 1 deletion change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Change Agent", func() {
BeforeEach(func() {
event := createTestDb("./sql/init_mock_db.sql", "test_change")
processSnapshot(&event)
knownTables = extractTablesFromDB(getDB())
knownTables = extractTableColsFromDB(getDB())
})

var initializeContext = func() {
Expand Down
23 changes: 22 additions & 1 deletion changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,28 @@ func (c *pollChangeManager) handleChangeServerError(err error) {
/*
* Determine if any tables in changes are not present in known tables
*/
func changesHaveNewTables(a map[string]bool, changes []common.Change) bool {
func changesHaveNewTables(a map[string]map[string]bool, changes []common.Change) bool {

//nil maps should not be passed in. Making the distinction between nil map and empty map
if a == nil {
log.Warn("Nil map passed to function changesHaveNewTables, may be bug")
return true
}

for _, change := range changes {
if _, ok := a[normalizeTableName(change.Table)]; !ok {
log.Infof("Unable to find %s table in current known tables", change.Table)
return true
}
}

return false
}

/*
* Determine if any columns added/dropped in any table
*/
func changesHavecolumnsChanged(a map[string]bool, changes []common.Change) bool {

//nil maps should not be passed in. Making the distinction between nil map and empty map
if a == nil {
Expand Down
79 changes: 34 additions & 45 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *simpleSnapShotManager) downloadDataSnapshot() {
}

func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
knownTables = extractTablesFromSnapshot(snapshot)
knownTables = extractTableColumnsFromSnapshot(snapshot)

_, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
Expand All @@ -167,62 +167,51 @@ func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {

}

func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {

tables = make(map[string]bool)

func extractTableColumnsFromSnapshot(snapshot *common.Snapshot) map[string]map[string]bool {
log.Debug("Extracting table names from snapshot")
if snapshot.Tables == nil {
//if this panic ever fires, it's a bug
db, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Database inaccessible: %v", err)
}
rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
if err != nil {
log.Panicf("Unable to read in known snapshot tables from sqlite file")
}
for rows.Next() {
var tableName string
rows.Scan(&tableName)
if err != nil {
log.Panic("Error scaning tableNames from _transicator_tables")
}
tables[tableName] = true
}

} else {

for _, table := range snapshot.Tables {
tables[table.Name] = true
}
db, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Database inaccessible: %v", err)
}
return tables

return extractTableColsFromDB(db)
}

func extractTablesFromDB(db apid.DB) (tables map[string]bool) {

tables = make(map[string]bool)
func extractTableColsFromDB(db apid.DB) map[string]map[string]bool {

log.Debug("Extracting table names from existing DB")
columns := make(map[string]map[string]bool)
tables := make([]string, 0)
rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
defer rows.Close()

if err != nil {
log.Panicf("Error reading current set of tables: %v", err)
log.Panicf("Unable to read in known snapshot tables from sqlite file")
}

defer rows.Close()
for rows.Next() {
var table string
if err := rows.Scan(&table); err != nil {
log.Panicf("Error reading current set of tables: %v", err)
var tableName string
rows.Scan(&tableName)
if err != nil {
log.Panic("Error scaning tableNames from _transicator_tables")
}
tables = append(tables, tableName)
}

for _, tableName := range tables {
columns[tableName] = make(map[string]bool)
dummyRows, err := db.Query("SELECT * FROM " + tableName + " LIMIT 0;")
if err != nil {
log.Panicf("Get table info failed: %v", err)
}
defer dummyRows.Close()
cols, err := dummyRows.Columns()
if err != nil {
log.Panicf("Get table columns failed: %v", err)
}
for _, col := range cols {
columns[tableName][col] = true
}
log.Debugf("Table %s found in existing db", table)

tables[table] = true
}
return tables
return columns
}

// Skip Downloading snapshot if there is already a snapshot available from previous run
Expand All @@ -235,7 +224,7 @@ func startOnLocalSnapshot(snapshot string) *common.Snapshot {
log.Panicf("Database inaccessible: %v", err)
}

knownTables = extractTablesFromDB(db)
knownTables = extractTableColsFromDB(db)

// allow plugins (including this one) to start immediately on existing database
// Note: this MUST have no tables as that is used as an indicator
Expand Down
65 changes: 65 additions & 0 deletions snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidApigeeSync

import (
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
"os"
"strings"
)

var _ = Describe("Change Agent", func() {

const testDbId = "test_snapshot"

Context("Change Agent Unit Tests", func() {

var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
initDb(sqlfile, "./mockdb_snapshot.sqlite3")
file, err := os.Open("./mockdb_snapshot.sqlite3")
if err != nil {
Fail("Failed to open mock db for test")
}

s := common.Snapshot{}
err = processSnapshotServerFileResponse(dbId, file, &s)
if err != nil {
Fail("Error processing test snapshots")
}
return s
}

BeforeEach(func() {
event := createTestDb("./sql/init_mock_db.sql", testDbId)
processSnapshot(&event)
knownTables = extractTableColsFromDB(getDB())
})

It("test extract table columns", func() {
s := &common.Snapshot{
SnapshotInfo: testDbId,
}
columns := extractTableColumnsFromSnapshot(s)
for table, colMap := range columns {
cols := []string{}
for col := range colMap {
cols = append(cols, col)
}
log.Error("snapshot TABLE: " + table + " COLUMN: " + strings.Join(cols, "|"))
}
})

})
})
49 changes: 11 additions & 38 deletions sql/init_mock_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ CREATE TABLE _transicator_tables
columnName varchar not null,
typid integer,
primaryKey bool);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0);
-- INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_deployment','id',1043,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_id',1043,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment','apid_cluster_id',1043,1);
Expand Down Expand Up @@ -107,21 +107,6 @@ INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1)
INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','id',2950,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','name',1043,0);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','ext_ref_id',1043,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','display_name',1043,0);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','description',1043,0);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_at',1114,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_by',1043,0);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_at',1114,1);
INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_by',1043,0);
INSERT INTO "_transicator_tables" VALUES('configuration','id',1043,1);
INSERT INTO "_transicator_tables" VALUES('configuration','body',25,0);
INSERT INTO "_transicator_tables" VALUES('configuration','created',1114,0);
INSERT INTO "_transicator_tables" VALUES('configuration','created_by',25,0);
INSERT INTO "_transicator_tables" VALUES('configuration','updated',1114,0);
INSERT INTO "_transicator_tables" VALUES('configuration','updated_by',25,0);
INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
Expand Down Expand Up @@ -156,18 +141,6 @@ INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_by',10
INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_at',1114,0);
INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_by',1043,0);
INSERT INTO "_transicator_tables" VALUES('kms_company_developer','_change_selector',1043,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','id',1043,1);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','deployment_id',1043,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','action',25,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_id',1043,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','apid_cluster_id',1043,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','data_scope_id',1043,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_json',25,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','config_json',25,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created',1114,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created_by',25,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated',1114,0);
INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated_by',25,0);
INSERT INTO "_transicator_tables" VALUES('kms_app','id',2950,1);
INSERT INTO "_transicator_tables" VALUES('kms_app','tenant_id',1043,1);
INSERT INTO "_transicator_tables" VALUES('kms_app','name',1043,1);
Expand Down