久久精品精选,精品九九视频,www久久只有这里有精品,亚洲熟女乱色综合一区
    分享

    docker快速安裝rabbitmq

     bylele 2021-09-27

     


    一、獲取鏡像

    #指定版本,該版本包含了web控制頁面
    docker pull rabbitmq:management

    二、運行鏡像

    #方式一:默認guest 用戶,密碼也是 guest
    docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
    
    #方式二:設(shè)置用戶名和密碼
    docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

    三、訪問ui頁面

    http://localhost:15672/

    四、golang案例

    復(fù)制代碼
    #producer生產(chǎn)者代碼
    package main
    
    import (
        "fmt"
    
        "log"
    
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
    
        uri = "amqp://guest:guest@10.0.0.11:5672/" // 10.0.0.11為主機ip
    
        //Durable AMQP exchange name
    
        exchangeName = ""
    
        //Durable AMQP queue name
    
        queueName = "test-queues"
    
        //Body of message
    
        bodyMsg string = "hello angel"
    )
    
    //如果存在錯誤,則輸出
    
    func failOnError(err error, msg string) {
    
        if err != nil {
    
            log.Fatalf("%s: %s", msg, err)
    
            panic(fmt.Sprintf("%s: %s", msg, err))
    
        }
    
    }
    
    func main() {
    
        //調(diào)用發(fā)布消息函數(shù)
    
        publish(uri, exchangeName, queueName, bodyMsg)
    
        log.Printf("published %dB OK", len(bodyMsg))
    
    }
    
    //發(fā)布者的方法
    
    //@amqpURI, amqp的地址
    
    //@exchange, exchange的名稱
    
    //@queue, queue的名稱
    
    //@body, 主體內(nèi)容
    
    func publish(amqpURI string, exchange string, queue string, body string) {
    
        //建立連接
    
        log.Printf("dialing %q", amqpURI)
    
        connection, err := amqp.Dial(amqpURI)
    
        failOnError(err, "Failed to connect to RabbitMQ")
    
        defer connection.Close()
    
        //創(chuàng)建一個Channel
    
        log.Printf("got Connection, getting Channel")
    
        channel, err := connection.Channel()
    
        failOnError(err, "Failed to open a channel")
    
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //創(chuàng)建一個queue
    
        q, err := channel.QueueDeclare(
    
            queueName, // name
    
            false, // durable
    
            false, // delete when unused
    
            false, // exclusive
    
            false, // no-wait
    
            nil, // arguments
    
        )
    
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能發(fā)送到exchange,它是不能直接發(fā)送到queue的
    
        // 現(xiàn)在我們使用默認的exchange(名字是空字符)這個默認的exchange允許我們發(fā)送給指定的queue
    
        // routing_key就是指定的queue名字
    
        err = channel.Publish(
    
            exchange, // exchange
    
            q.Name, // routing key
    
            false, // mandatory
    
            false, // immediate
    
            amqp.Publishing{
    
                Headers: amqp.Table{},
    
                ContentType: "text/plain",
    
                ContentEncoding: "",
    
                Body: []byte(body),
            })
    
        failOnError(err, "Failed to publish a message")
    
    }
    復(fù)制代碼
    復(fù)制代碼
    #consumer消費者代碼
    package main
    
    import (
        "fmt"
    
        "log"
    
        "github.com/streadway/amqp"
    )
    
    const (
        //AMQP URI
    
        uri = "amqp://guest:guest@10.0.0.11:5672/"
    
        //Durable AMQP exchange nam
    
        exchangeName = ""
    
        //Durable AMQP queue name
    
        queueName = "test-queues"
    )
    
    //如果存在錯誤,則輸出
    
    func failOnError(err error, msg string) {
    
        if err != nil {
    
            log.Fatalf("%s: %s", msg, err)
    
            panic(fmt.Sprintf("%s: %s", msg, err))
    
        }
    
    }
    
    func main() {
    
        //調(diào)用消息接收者
    
        consumer(uri, exchangeName, queueName)
    
    }
    
    //接收者方法
    
    //@amqpURI, amqp的地址
    
    //@exchange, exchange的名稱
    
    //@queue, queue的名稱
    
    func consumer(amqpURI string, exchange string, queue string) {
    
        //建立連接
    
        log.Printf("dialing %q", amqpURI)
    
        connection, err := amqp.Dial(amqpURI)
    
        failOnError(err, "Failed to connect to RabbitMQ")
    
        defer connection.Close()
    
        //創(chuàng)建一個Channel
    
        log.Printf("got Connection, getting Channel")
    
        channel, err := connection.Channel()
    
        failOnError(err, "Failed to open a channel")
    
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //創(chuàng)建一個queue
    
        q, err := channel.QueueDeclare(
    
            queueName, // name
    
            false, // durable
    
            false, // delete when unused
    
            false, // exclusive
    
            false, // no-wait
    
            nil, // arguments
    
        )
    
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
    
        //訂閱消息
    
        msgs, err := channel.Consume(
    
            q.Name, // queue
    
            "", // consumer
    
            true, // auto-ack
    
            false, // exclusive
    
            false, // no-local
    
            false, // no-wait
    
            nil, // args
    
        )
    
        failOnError(err, "Failed to register a consumer")
    
        //創(chuàng)建一個channel
    
        forever := make(chan bool)
    
        //調(diào)用gorountine
    
        go func() {
    
            for d := range msgs {
    
                log.Printf("Received a message: %s", d.Body)
    
            }
    
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //沒有寫入數(shù)據(jù),一直等待讀,阻塞當前線程,目的是讓線程不退出
    
        <-forever
    
    }
    復(fù)制代碼

     

    五、擁有消息確認的代碼

    復(fù)制代碼
    #producer
    package main
    
    import (
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "os"
        "strings"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@10.0.0.11:5672/"
        //Durable AMQP exchange name
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    )
    
    //如果存在錯誤,則輸出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        bodyMsg := bodyFrom(os.Args)
        //調(diào)用發(fā)布消息函數(shù)
        publish(uri, exchangeName, queueName, bodyMsg)
        log.Printf("published %dB OK", len(bodyMsg))
    }
    
    func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
            s = "hello angel"
        } else {
            s = strings.Join(args[1:], " ")
        }
        return s
    }
    
    //發(fā)布者的方法
    //@amqpURI, amqp的地址
    //@exchange, exchange的名稱
    //@queue, queue的名稱
    //@body, 主體內(nèi)容
    func publish(amqpURI string, exchange string, queue string, body string) {
        //建立連接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //創(chuàng)建一個Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //創(chuàng)建一個queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("declared queue, publishing %dB body (%q)", len(body), body)
    
        // Producer只能發(fā)送到exchange,它是不能直接發(fā)送到queue的。
        // 現(xiàn)在我們使用默認的exchange(名字是空字符)。這個默認的exchange允許我們發(fā)送給指定的queue。
        // routing_key就是指定的queue名字。
        err = channel.Publish(
            exchange, // exchange
            q.Name,   // routing key
            false,    // mandatory
            false,    // immediate
            amqp.Publishing{
                Headers:         amqp.Table{},
                ContentType:     "text/plain",
                ContentEncoding: "",
                Body:            []byte(body),
            })
        failOnError(err, "Failed to publish a message")
    }
    復(fù)制代碼
    復(fù)制代碼
    #consumer
    package main
    
    import (
        "bytes"
        "fmt"
        "github.com/streadway/amqp"
        "log"
        "time"
    )
    
    const (
        //AMQP URI
        uri = "amqp://guest:guest@10.0.0.11:5672/"
        //Durable AMQP exchange nam
        exchangeName = ""
        //Durable AMQP queue name
        queueName = "test-queues-acknowledgments"
    )
    
    //如果存在錯誤,則輸出
    func failOnError(err error, msg string) {
        if err != nil {
            log.Fatalf("%s: %s", msg, err)
            panic(fmt.Sprintf("%s: %s", msg, err))
        }
    }
    
    func main() {
        //調(diào)用消息接收者
        consumer(uri, exchangeName, queueName)
    }
    
    //接收者方法
    //@amqpURI, amqp的地址
    //@exchange, exchange的名稱
    //@queue, queue的名稱
    func consumer(amqpURI string, exchange string, queue string) {
        //建立連接
        log.Printf("dialing %q", amqpURI)
        connection, err := amqp.Dial(amqpURI)
        failOnError(err, "Failed to connect to RabbitMQ")
        defer connection.Close()
    
        //創(chuàng)建一個Channel
        log.Printf("got Connection, getting Channel")
        channel, err := connection.Channel()
        failOnError(err, "Failed to open a channel")
        defer channel.Close()
    
        log.Printf("got queue, declaring %q", queue)
    
        //創(chuàng)建一個queue
        q, err := channel.QueueDeclare(
            queueName, // name
            false,     // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        failOnError(err, "Failed to declare a queue")
    
        log.Printf("Queue bound to Exchange, starting Consume")
        //訂閱消息
        msgs, err := channel.Consume(
            q.Name, // queue
            "",     // consumer
            false,  // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        failOnError(err, "Failed to register a consumer")
    
        //創(chuàng)建一個channel
        forever := make(chan bool)
    
        //調(diào)用gorountine
        go func() {
            for d := range msgs {
                log.Printf("Received a message: %s", d.Body)
                dot_count := bytes.Count(d.Body, []byte("."))
                t := time.Duration(dot_count)
                time.Sleep(t * time.Second)
                log.Printf("Done")
                d.Ack(false)
            }
        }()
    
        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    
        //沒有寫入數(shù)據(jù),一直等待讀,阻塞當前線程,目的是讓線程不退出
        <-forever
    }
    復(fù)制代碼

     

      本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
      轉(zhuǎn)藏 分享 獻花(0

      0條評論

      發(fā)表

      請遵守用戶 評論公約

      類似文章 更多

      主站蜘蛛池模板: 日韩AV片无码一区二区不卡电影 | 桃子视频在线播放WWW| 久久婷婷国产综合精品| 亚洲日韩性欧美中文字幕| 久久大香伊蕉在人线免费AV| 亚洲欧洲中文日韩久久AV乱码| 丰满无码人妻热妇无码区| 久久婷婷五月综合色国产免费观看 | 国产猛男猛女超爽免费视频| 日韩人妻一区中文字幕| 女人喷液抽搐高潮视频 | 日韩精品无码区免费专区| 一卡二卡三卡四卡视频区| 内射女校花一区二区三区| 国产69精品久久久久99尤物| 在厨房被C到高潮A毛片奶水| 亚洲成人资源在线观看| 人人妻人人澡人人爽欧美精品 | 国产精品久久久久影院| 亚洲精品一区二区区别| 欧美成人精品高清在线观看| 免费现黄频在线观看国产| 97人妻人人揉人人躁人人| 国产香蕉尹人综合在线观看| 狠狠久久亚洲欧美专区| 无码人妻丰满熟妇啪啪网不卡| 久久精品中文闷骚内射| 乱妇乱女熟妇熟女网站| 老司机午夜精品视频资源| 少妇扒开毛茸茸的B自慰| 人人超人人超碰超国产 | 狠狠婷婷色五月中文字幕| 一本色道久久东京热| 丰满人妻被黑人猛烈进入| 40岁大乳的熟妇在线观看| 日韩欧国产精品一区综合无码| 久久国产成人亚洲精品影院老金| 中文字幕日韩精品人妻| 精品国产AV无码一道| 久久99国产精品久久99小说| 嫩草成人AV影院在线观看|