Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 83 additions & 48 deletions app/plugins/trxs_to_mysql_service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package plugins

import (
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/coschain/contentos-go/node"
"github.com/coschain/contentos-go/prototype"
_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
"github.com/sirupsen/logrus"
"time"
)
Expand Down Expand Up @@ -38,36 +38,43 @@ func FindCreator(operation *prototype.Operation) (name string) {
return
}

func IsCreateAccountOp(operation *prototype.Operation) bool {
switch operation.Op.(type) {
case *prototype.Operation_Op1:
return true
default:
return false
}
var TrxMysqlServiceName = "trxsqlservice"

type TrxInfo struct {
ID uint64 `gorm:"primary_key;auto_increment"`
TrxId string `gorm:"unique_index;not null"`
BlockHeight uint64 `gorm:"index;not null"`
BlockTime uint64 `gorm:"index;not null"`
Invoice json.RawMessage `sql:"type:json"`
Operations json.RawMessage `sql:"type:json"`
BlockId string `gorm:"not null"`
Creator string `gorm:"index;not null"`
}

func IsTransferOp(operation *prototype.Operation) bool {
switch operation.Op.(type) {
case *prototype.Operation_Op2:
return true
default:
return false
}
func (TrxInfo) TableName() string {
return "trxinfo"
}

var TrxMysqlServiceName = "trxsqlservice"
type LibInfo struct {
Lib uint64
LastCheckTime int64
}

func (LibInfo) TableName() string {
return "libinfo"
}

type TrxMysqlService struct {
node.Service
config *service_configs.DatabaseConfig
consensus iservices.IConsensus
outDb *sql.DB
outDb *gorm.DB
log *logrus.Logger
ctx *node.ServiceContext
quit chan bool
}


func NewTrxMysqlSerVice(ctx *node.ServiceContext, config *service_configs.DatabaseConfig, log *logrus.Logger) (*TrxMysqlService, error) {
return &TrxMysqlService{ctx: ctx, log: log, config: config}, nil
}
Expand All @@ -81,12 +88,30 @@ func (t *TrxMysqlService) Start(node *node.Node) error {
t.consensus = consensus.(iservices.IConsensus)
// dns: data source name
dsn := fmt.Sprintf("%s:%s@/%s", t.config.User, t.config.Password, t.config.Db)
outDb, err := sql.Open(t.config.Driver, dsn)

if err != nil {
//outDb, err := sql.Open(t.config.Driver, dsn)
if db, err := gorm.Open(t.config.Driver, dsn); err != nil {
return err
} else {
t.outDb = db
}

if !t.outDb.HasTable(&TrxInfo{}) {
if err := t.outDb.CreateTable(&TrxInfo{}).Error; err != nil {
_ = t.outDb.Close()
return err
}
}
progress := &LibInfo{
Lib: 0,
LastCheckTime: 0,
}
if !t.outDb.HasTable(progress) {
if err := t.outDb.CreateTable(progress).Error; err != nil {
_ = t.outDb.Close()
return err
}
t.outDb.Create(progress)
}
t.outDb = outDb

ticker := time.NewTicker(time.Second)
go func() {
Expand All @@ -110,15 +135,13 @@ func (t *TrxMysqlService) pollLIB() error {
start := common.EasyTimer()
lib := t.consensus.GetLIB().BlockNum()
t.log.Debugf("[trx db] sync lib: %d \n", lib)
stmt, _ := t.outDb.Prepare("SELECT lib from libinfo limit 1")
defer stmt.Close()
var lastLib uint64 = 0
_ = stmt.QueryRow().Scan(&lastLib)
process := &LibInfo{}
t.outDb.First(process)
// begin from lastLib + 1, thus each time update libinfo should be atomic
lastLib := process.Lib + 1
// be carefully, no where condition there !!
// the reason is only one row in the table
// if introduce the mechanism that record checkpoint, the where closure should be added
updateStmt, _ := t.outDb.Prepare("UPDATE libinfo SET lib=?, last_check_time=?")
defer updateStmt.Close()
var waitingSyncLib []uint64
var count = 0
for lastLib < lib {
Expand All @@ -131,26 +154,43 @@ func (t *TrxMysqlService) pollLIB() error {
}

for _, block := range waitingSyncLib {
tx := t.outDb.Begin()
blockStart := common.EasyTimer()
t.handleLibNotification(block)
utcTimestamp := time.Now().UTC().Unix()
_, _ = updateStmt.Exec(block, utcTimestamp)
t.log.Debugf("[trx db] insert block %d, spent: %v", block, blockStart)
trxInfoList, _ := t.handleLibNotification(block)
if trxInfoList != nil {
for _, trxInfo := range trxInfoList {
if err := tx.Create(trxInfo).Error; err != nil {
t.log.Errorf("[trx db] when inserted block %d, error occurred: %v", block , err)
tx.Rollback()
return err
}
}
}
process.Lib = block
process.LastCheckTime = time.Now().UTC().Unix()
if err := tx.Model(&LibInfo{}).Update(process).Error; err != nil {
tx.Rollback()
t.log.Errorf("[trx db] when committed block %d, error occurred: %v", block , err)
} else {
tx.Commit()
}
t.log.Debugf("[trx db] insert block %d, spent: %s", block, blockStart)
}
t.log.Debugf("[trx db] PollLib spent: %v", start)
return nil
}

func (t *TrxMysqlService) handleLibNotification(lib uint64) {
func (t *TrxMysqlService) handleLibNotification(lib uint64) ([]*TrxInfo, error) {
blks , err := t.consensus.FetchBlocks(lib, lib)
if err != nil {
t.log.Error(err)
return
return nil, err
}
if len(blks) == 0 {
return
return nil, nil
}
blk := blks[0].(*prototype.SignedBlock)
var trxInfoList []*TrxInfo
for _, trx := range blk.Transactions {
trxHash, _ := trx.SigTrx.Id()
trxId := hex.EncodeToString(trxHash.Hash)
Expand All @@ -161,22 +201,17 @@ func (t *TrxMysqlService) handleLibNotification(lib uint64) {
invoice, _ := json.Marshal(trx.Receipt)
operations := PurgeOperation(trx.SigTrx.GetTrx().GetOperations())
operationsJson, _ := json.Marshal(operations)
//operation := trx.SigTrx.GetTrx().GetOperations()[0]
creator := FindCreator(trx.SigTrx.GetTrx().GetOperations()[0])
_, _ = t.outDb.Exec("INSERT IGNORE INTO trxinfo (trx_id, block_height, block_id, block_time, invoice, operations, creator) value (?, ?, ?, ?, ?, ?, ?)", trxId, blockHeight, blockId, blockTime, invoice, operationsJson, creator)
//for _, operation := range trx.SigTrx.GetTrx().GetOperations() {
// if IsCreateAccountOp(operation) {
// _, _ = t.outDb.Exec("INSERT IGNORE INTO createaccountinfo (trx_id, create_time, creator, pubkey, account) values (?, ?, ?, ?, ?)", trxId, blockTime, creator, operation.GetOp1().PubKey.ToWIF(), operation.GetOp1().NewAccountName.Value)
// break
// }
//}
//for _, operation := range trx.SigTrx.GetTrx().GetOperations() {
// if IsTransferOp(operation) {
// _, _ = t.outDb.Exec("INSERT IGNORE INTO transferinfo (trx_id, create_time, sender, receiver, amount, memo) values (?, ?, ?, ?, ?, ?)", trxId, blockTime, creator, operation.GetOp2().To.Value, operation.GetOp2().Amount.Value, operation.GetOp2().Memo)
// break
// }
//}
trxInfoList = append(trxInfoList,
&TrxInfo{TrxId:trxId,
BlockHeight:blockHeight,
BlockId: blockId,
BlockTime:blockTime,
Invoice:invoice,
Operations:operationsJson,
Creator:creator})
}
return trxInfoList, nil
}

func (t *TrxMysqlService) stop() {
Expand Down
103 changes: 0 additions & 103 deletions cmd/cosd/commands/db.go

This file was deleted.

1 change: 0 additions & 1 deletion cmd/cosd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var rootCmd = &cobra.Command{
func addCommands() {
rootCmd.AddCommand(commands.InitCmd())
rootCmd.AddCommand(commands.StartCmd())
rootCmd.AddCommand(commands.DbCmd())
}

func main() {
Expand Down