From c596c39ce24dc3b35e960bd2d629e7c400bd990f Mon Sep 17 00:00:00 2001 From: aprocysanae Date: Mon, 11 Nov 2019 15:16:45 +0800 Subject: [PATCH 1/5] update trxs to mysql service --- app/plugins/trxs_to_mysql_service.go | 133 +++++++++++++++++---------- 1 file changed, 85 insertions(+), 48 deletions(-) diff --git a/app/plugins/trxs_to_mysql_service.go b/app/plugins/trxs_to_mysql_service.go index 80b013e1..5a4f7ae9 100644 --- a/app/plugins/trxs_to_mysql_service.go +++ b/app/plugins/trxs_to_mysql_service.go @@ -1,7 +1,6 @@ package plugins import ( - "database/sql" "encoding/hex" "encoding/json" "fmt" @@ -11,6 +10,7 @@ import ( "github.com/coschain/contentos-go/node" "github.com/coschain/contentos-go/prototype" _ "github.com/go-sql-driver/mysql" + "github.com/jinzhu/gorm" "github.com/sirupsen/logrus" "time" ) @@ -38,36 +38,45 @@ func FindCreator(operation *prototype.Operation) (name string) { return } -func IsCreateAccountOp(operation *prototype.Operation) bool { - switch operation.Op.(type) { - case *prototype.Operation_Op1: - return true - default: - return false - } +var TrxMysqlServiceName = "trxsqlservice" + +type TrxInfo struct { + ID uint64 `gorm:"primary_key;auto_increment"` + TrxId string `gorm:"unique_index;not null"` + BlockHeight uint64 `gorm:"index;not null"` + BlockTime uint64 `gorm:"index;not null"` + Invoice string `gorm:"type:longtext"` + Operations string `gorm:"type:longtext"` + BlockId string `gorm:"not null"` + Creator string `gorm:"index;not null"` } -func IsTransferOp(operation *prototype.Operation) bool { - switch operation.Op.(type) { - case *prototype.Operation_Op2: - return true - default: - return false - } +func (TrxInfo) TableName() string { + return "trxinfo" } -var TrxMysqlServiceName = "trxsqlservice" +type LibInfo struct { + ID uint64 `gorm:"primary_key;auto_increment"` + BlockHeight uint64 + //LastCheckTime time.Time + LastCheckTime int64 +} + +func (LibInfo) TableName() string { + return "libinfo" +} type TrxMysqlService struct { node.Service config *service_configs.DatabaseConfig consensus iservices.IConsensus - outDb *sql.DB + outDb *gorm.DB log *logrus.Logger ctx *node.ServiceContext quit chan bool } + func NewTrxMysqlSerVice(ctx *node.ServiceContext, config *service_configs.DatabaseConfig, log *logrus.Logger) (*TrxMysqlService, error) { return &TrxMysqlService{ctx: ctx, log: log, config: config}, nil } @@ -81,12 +90,30 @@ func (t *TrxMysqlService) Start(node *node.Node) error { t.consensus = consensus.(iservices.IConsensus) // dns: data source name dsn := fmt.Sprintf("%s:%s@/%s", t.config.User, t.config.Password, t.config.Db) - outDb, err := sql.Open(t.config.Driver, dsn) - - if err != nil { + //outDb, err := sql.Open(t.config.Driver, dsn) + if db, err := gorm.Open(t.config.Driver, dsn); err != nil { return err + } else { + t.outDb = db + } + + if !t.outDb.HasTable(&TrxInfo{}) { + if err := t.outDb.CreateTable(&TrxInfo{}).Error; err != nil { + _ = t.outDb.Close() + return err + } + } + progress := &LibInfo{ + BlockHeight: 0, + LastCheckTime: 0, + } + if !t.outDb.HasTable(progress) { + if err := t.outDb.CreateTable(progress).Error; err != nil { + _ = t.outDb.Close() + return err + } + t.outDb.Create(progress) } - t.outDb = outDb ticker := time.NewTicker(time.Second) go func() { @@ -110,15 +137,13 @@ func (t *TrxMysqlService) pollLIB() error { start := common.EasyTimer() lib := t.consensus.GetLIB().BlockNum() t.log.Debugf("[trx db] sync lib: %d \n", lib) - stmt, _ := t.outDb.Prepare("SELECT lib from libinfo limit 1") - defer stmt.Close() - var lastLib uint64 = 0 - _ = stmt.QueryRow().Scan(&lastLib) + process := &LibInfo{} + t.outDb.First(process) + // begin from lastLib + 1, thus each time update libinfo should be atomic + lastLib := process.BlockHeight + 1 // be carefully, no where condition there !! // the reason is only one row in the table // if introduce the mechanism that record checkpoint, the where closure should be added - updateStmt, _ := t.outDb.Prepare("UPDATE libinfo SET lib=?, last_check_time=?") - defer updateStmt.Close() var waitingSyncLib []uint64 var count = 0 for lastLib < lib { @@ -131,26 +156,43 @@ func (t *TrxMysqlService) pollLIB() error { } for _, block := range waitingSyncLib { + tx := t.outDb.Begin() blockStart := common.EasyTimer() - t.handleLibNotification(block) - utcTimestamp := time.Now().UTC().Unix() - _, _ = updateStmt.Exec(block, utcTimestamp) - t.log.Debugf("[trx db] insert block %d, spent: %v", block, blockStart) + trxInfoList, _ := t.handleLibNotification(block) + if trxInfoList != nil { + for _, trxInfo := range trxInfoList { + if err := tx.Create(trxInfo).Error; err != nil { + t.log.Errorf("[trx db] when inserted block %d, error occurred: %v", block , err) + tx.Rollback() + return err + } + } + } + process.BlockHeight = block + process.LastCheckTime = time.Now().UTC().Unix() + if err := tx.Save(process).Error; err != nil { + tx.Rollback() + t.log.Errorf("[trx db] when committed block %d, error occurred: %v", block , err) + } else { + tx.Commit() + } + t.log.Debugf("[trx db] insert block %d, spent: %s", block, blockStart) } t.log.Debugf("[trx db] PollLib spent: %v", start) return nil } -func (t *TrxMysqlService) handleLibNotification(lib uint64) { +func (t *TrxMysqlService) handleLibNotification(lib uint64) ([]*TrxInfo, error) { blks , err := t.consensus.FetchBlocks(lib, lib) if err != nil { t.log.Error(err) - return + return nil, err } if len(blks) == 0 { - return + return nil, nil } blk := blks[0].(*prototype.SignedBlock) + var trxInfoList []*TrxInfo for _, trx := range blk.Transactions { trxHash, _ := trx.SigTrx.Id() trxId := hex.EncodeToString(trxHash.Hash) @@ -161,22 +203,17 @@ func (t *TrxMysqlService) handleLibNotification(lib uint64) { invoice, _ := json.Marshal(trx.Receipt) operations := PurgeOperation(trx.SigTrx.GetTrx().GetOperations()) operationsJson, _ := json.Marshal(operations) - //operation := trx.SigTrx.GetTrx().GetOperations()[0] creator := FindCreator(trx.SigTrx.GetTrx().GetOperations()[0]) - _, _ = t.outDb.Exec("INSERT IGNORE INTO trxinfo (trx_id, block_height, block_id, block_time, invoice, operations, creator) value (?, ?, ?, ?, ?, ?, ?)", trxId, blockHeight, blockId, blockTime, invoice, operationsJson, creator) - //for _, operation := range trx.SigTrx.GetTrx().GetOperations() { - // if IsCreateAccountOp(operation) { - // _, _ = t.outDb.Exec("INSERT IGNORE INTO createaccountinfo (trx_id, create_time, creator, pubkey, account) values (?, ?, ?, ?, ?)", trxId, blockTime, creator, operation.GetOp1().PubKey.ToWIF(), operation.GetOp1().NewAccountName.Value) - // break - // } - //} - //for _, operation := range trx.SigTrx.GetTrx().GetOperations() { - // if IsTransferOp(operation) { - // _, _ = t.outDb.Exec("INSERT IGNORE INTO transferinfo (trx_id, create_time, sender, receiver, amount, memo) values (?, ?, ?, ?, ?, ?)", trxId, blockTime, creator, operation.GetOp2().To.Value, operation.GetOp2().Amount.Value, operation.GetOp2().Memo) - // break - // } - //} + trxInfoList = append(trxInfoList, + &TrxInfo{TrxId:trxId, + BlockHeight:blockHeight, + BlockId: blockId, + BlockTime:blockTime, + Invoice:string(invoice), + Operations:string(operationsJson), + Creator:creator}) } + return trxInfoList, nil } func (t *TrxMysqlService) stop() { From 377e44100a6c97741fdc1297ba214b3d17db07c9 Mon Sep 17 00:00:00 2001 From: aprocysanae Date: Mon, 11 Nov 2019 15:47:39 +0800 Subject: [PATCH 2/5] rm db command --- cmd/cosd/commands/db.go | 103 ---------------------------------------- cmd/cosd/main.go | 1 - 2 files changed, 104 deletions(-) delete mode 100644 cmd/cosd/commands/db.go diff --git a/cmd/cosd/commands/db.go b/cmd/cosd/commands/db.go deleted file mode 100644 index b13ae22e..00000000 --- a/cmd/cosd/commands/db.go +++ /dev/null @@ -1,103 +0,0 @@ -package commands - -import ( - "database/sql" - "fmt" - "github.com/coschain/cobra" - "github.com/coschain/contentos-go/config" - "github.com/coschain/contentos-go/node" - "github.com/spf13/viper" - "os" - "path/filepath" - "time" -) - -var DbCmd = func() *cobra.Command { - cmd := &cobra.Command{ - Use: "db", - } - - initCmd := &cobra.Command{ - Use: "init", - Short: "initialize all db", - Run: initAllDb, - } - - cmd.AddCommand(initCmd) - return cmd -} - -func readConfig() *node.Config { - var cfg node.Config - if cfgName == "" { - cfg.Name = ClientIdentifier - } else { - cfg.Name = cfgName - } - viper.SetConfigName("config") - viper.SetConfigType("toml") - confdir := filepath.Join(config.DefaultDataDir(), cfg.Name) - viper.AddConfigPath(confdir) - err := viper.ReadInConfig() - if err == nil { - _ = viper.Unmarshal(&cfg) - } else { - fmt.Printf("fatal: not be initialized (do `init` first)\n") - os.Exit(1) - } - return &cfg -} - -func initTrxDb(cmd *cobra.Command, args []string) { - cfg := readConfig() - dbConfig := cfg.Database - dsn := fmt.Sprintf("%s:%s@/%s", dbConfig.User, dbConfig.Password, dbConfig.Db) - db, err := sql.Open(dbConfig.Driver, dsn) - defer db.Close() - if err != nil { - fmt.Printf("fatal: init database failed, dsn:%s\n", dsn) - os.Exit(1) - } - createTrxInfo := `create table trxinfo - ( - id bigint AUTO_INCREMENT PRIMARY KEY, - trx_id varchar(64) not null, - block_height int unsigned not null, - block_time int unsigned not null, - invoice json null, - operations json null, - block_id varchar(64) not null, - creator varchar(64) not null, - INDEX trxinfo_block_height_index (block_height), - INDEX trxinfo_block_time_index (block_time), - INDEX trxinfo_block_id (block_id), - INDEX trxinfo_block_creator (creator), - constraint trxinfo_trx_id_uindex - unique (trx_id) - );` - - createLibInfo := `create table libinfo - ( - lib int unsigned not null, - last_check_time int unsigned not null - );` - - dropTables := []string{"trxinfo", "libinfo"} - for _, table := range dropTables { - dropSql := fmt.Sprintf("DROP TABLE IF EXISTS `%s`", table) - if _, err = db.Exec(dropSql); err != nil { - fmt.Println(err) - } - } - createTables := []string{createTrxInfo, createLibInfo } - for _, table := range createTables { - if _, err = db.Exec(table); err != nil { - fmt.Println(err) - } - } - _, _ = db.Exec("INSERT INTO `libinfo` (lib, last_check_time) VALUES (?, ?)", 0, time.Now().UTC().Unix()) -} - -func initAllDb(cmd *cobra.Command, args []string) { - initTrxDb(cmd, args) -} diff --git a/cmd/cosd/main.go b/cmd/cosd/main.go index 7e653b72..fb1ef518 100644 --- a/cmd/cosd/main.go +++ b/cmd/cosd/main.go @@ -17,7 +17,6 @@ var rootCmd = &cobra.Command{ func addCommands() { rootCmd.AddCommand(commands.InitCmd()) rootCmd.AddCommand(commands.StartCmd()) - rootCmd.AddCommand(commands.DbCmd()) } func main() { From 37136feb9163b8ca5f6bce89ef38785d44153d3a Mon Sep 17 00:00:00 2001 From: aprocysanae Date: Mon, 11 Nov 2019 16:45:02 +0800 Subject: [PATCH 3/5] compatible with old format --- app/plugins/trxs_to_mysql_service.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/app/plugins/trxs_to_mysql_service.go b/app/plugins/trxs_to_mysql_service.go index 5a4f7ae9..c899c5a6 100644 --- a/app/plugins/trxs_to_mysql_service.go +++ b/app/plugins/trxs_to_mysql_service.go @@ -45,8 +45,8 @@ type TrxInfo struct { TrxId string `gorm:"unique_index;not null"` BlockHeight uint64 `gorm:"index;not null"` BlockTime uint64 `gorm:"index;not null"` - Invoice string `gorm:"type:longtext"` - Operations string `gorm:"type:longtext"` + Invoice json.RawMessage `sql:"type:json"` + Operations json.RawMessage `sql:"type:json"` BlockId string `gorm:"not null"` Creator string `gorm:"index;not null"` } @@ -57,8 +57,7 @@ func (TrxInfo) TableName() string { type LibInfo struct { ID uint64 `gorm:"primary_key;auto_increment"` - BlockHeight uint64 - //LastCheckTime time.Time + Lib uint64 LastCheckTime int64 } @@ -104,7 +103,7 @@ func (t *TrxMysqlService) Start(node *node.Node) error { } } progress := &LibInfo{ - BlockHeight: 0, + Lib: 0, LastCheckTime: 0, } if !t.outDb.HasTable(progress) { @@ -140,7 +139,7 @@ func (t *TrxMysqlService) pollLIB() error { process := &LibInfo{} t.outDb.First(process) // begin from lastLib + 1, thus each time update libinfo should be atomic - lastLib := process.BlockHeight + 1 + lastLib := process.Lib + 1 // be carefully, no where condition there !! // the reason is only one row in the table // if introduce the mechanism that record checkpoint, the where closure should be added @@ -168,7 +167,7 @@ func (t *TrxMysqlService) pollLIB() error { } } } - process.BlockHeight = block + process.Lib = block process.LastCheckTime = time.Now().UTC().Unix() if err := tx.Save(process).Error; err != nil { tx.Rollback() @@ -209,8 +208,8 @@ func (t *TrxMysqlService) handleLibNotification(lib uint64) ([]*TrxInfo, error) BlockHeight:blockHeight, BlockId: blockId, BlockTime:blockTime, - Invoice:string(invoice), - Operations:string(operationsJson), + Invoice:invoice, + Operations:operationsJson, Creator:creator}) } return trxInfoList, nil From b6cfe7681fce20a65e0711511a7f0120b4c1ff50 Mon Sep 17 00:00:00 2001 From: aprocysanae Date: Mon, 11 Nov 2019 16:58:34 +0800 Subject: [PATCH 4/5] libinfo doesn't have id field --- app/plugins/trxs_to_mysql_service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/plugins/trxs_to_mysql_service.go b/app/plugins/trxs_to_mysql_service.go index c899c5a6..1a1980f0 100644 --- a/app/plugins/trxs_to_mysql_service.go +++ b/app/plugins/trxs_to_mysql_service.go @@ -56,7 +56,6 @@ func (TrxInfo) TableName() string { } type LibInfo struct { - ID uint64 `gorm:"primary_key;auto_increment"` Lib uint64 LastCheckTime int64 } From 30488ced24a5406c27ea81dce2130d1f8cb5c12f Mon Sep 17 00:00:00 2001 From: aprocysanae Date: Mon, 11 Nov 2019 17:19:15 +0800 Subject: [PATCH 5/5] gorm default saving way is create_or_update --- app/plugins/trxs_to_mysql_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/plugins/trxs_to_mysql_service.go b/app/plugins/trxs_to_mysql_service.go index 1a1980f0..13927a76 100644 --- a/app/plugins/trxs_to_mysql_service.go +++ b/app/plugins/trxs_to_mysql_service.go @@ -168,7 +168,7 @@ func (t *TrxMysqlService) pollLIB() error { } process.Lib = block process.LastCheckTime = time.Now().UTC().Unix() - if err := tx.Save(process).Error; err != nil { + if err := tx.Model(&LibInfo{}).Update(process).Error; err != nil { tx.Rollback() t.log.Errorf("[trx db] when committed block %d, error occurred: %v", block , err) } else {