kafka-to-mongo/kafka-mongo-sample.go
func main() { -
//Create MongoDB session session := initialiseMongo() mongoStore.session = session -
receiveFromKafka() -
} -
func receiveFromKafka() { -
fmt.Println("Start receiving from Kafka") c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "group-id-1", "auto.offset.reset": "earliest", }) -
if err != nil { panic(err) } -
c.SubscribeTopics([]string{"jobs-topic1"}, nil) -
for { msg, err := c.ReadMessage(-1) -
if err == nil { fmt.Printf("Received from Kafka %s: %sn", msg.TopicPartition, string(msg.Value)) job := string(msg.Value) saveJobToMongo(job) } else { fmt.Printf("Consumer error: %v (%v)n", err, msg) break } } -
c.Close() -
} -
func saveJobToMongo(jobString string) { -
fmt.Println("Save to MongoDB") col := mongoStore.session.DB(database).C(collection) -
//Save data into Job struct var _job Job b := []byte(jobString) err := json.Unmarshal(b, &_job) if err != nil { panic(err) } -
//Insert job into MongoDB errMongo := col.Insert(_job) if errMongo != nil { panic(errMongo) } -
fmt.Printf("Saved to MongoDB : %s", jobString) -
}
(编辑:威海站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|