HelloWorld
はじめに
RabbitMQ: メッセージを受信して??配信するもので、「郵便局」とみなすことができます。送信者と受信者はキューを介して対話します。キューのサイズは無制限であると考えられます。複數(shù)の送信者がキューにメッセージを送信でき、複數(shù)の受信者がキューからメッセージを受信することもできます。
コード
rabbitmq で使用されるプロトコルは amqp で、Python の推奨クライアントは pika
pip?install?pika?-i?https://pypi.douban.com/simple/
send.py
#?coding:?utf8 import?pika #?建立一個連接 connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???????????'localhost'))??#?連接本地的RabbitMQ服務器 channel?=?connection.channel()??#?獲得channel
他のマシン上のサーバーに接続したい場合は、ここにあるリンクがあります。 just fill in アドレスまたはホスト名を入力するだけです。
次にメッセージの送信を開始します。メッセージを受け入れるキューが存在することを確認してください。存在しない場合、rabbitMQ はメッセージを破棄します
channel.queue_declare(queue='hello')??#?在RabbitMQ中創(chuàng)建hello這個隊列 channel.basic_publish(exchange='',??#?使用默認的exchange來發(fā)送消息到隊列 ??????????????????routing_key='hello',??#?發(fā)送到該隊列?hello?中 ??????????????????body='Hello?World!')??#?消息內(nèi)容 connection.close()??#?關閉?同時flush
RabbitMQ はデフォルトで 1GB の空きディスク容量を必要とし、それ以外の場合は送信が失敗します。
このとき、ローカルキューhelloにメッセージが格納されています。rabbitmqctl list_queuesを使用すると、helloキューにメッセージが格納されていることを示す
receive.py
hello?1
が表示されます。最初にサーバーに接続した方が良いです。前に送信したときと同じです
#?coding:?utf8 import?pika connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???????????????'localhost')) channel?=?connection.channel()
ワークキュー(タスクキュー)
ワークキューは、時間のかかるタスクを複數(shù)のワーカープロセスに分散するために使用されます。リソースを消費するタスクをすぐに実行するのではなく (これらのタスクが完了するまで待つ必要があります)、後で実行するようにこれらのタスクをスケジュールします。たとえば、タスクをメッセージとしてキューに送信し、ワーカー プロセスを開始してタスクを受け入れ、最終的に実行します。また、複數(shù)のワーカー プロセスを開始して動作させることもできます。これは、http リクエストの処理ウィンドウ內(nèi)で複雑なタスクを完了すべきではない Web アプリケーションに當てはまります。
channel.queue_declare(queue='hello')??#?此處就是聲明了?來確保該隊列?hello?存在?可以多次聲明?這里主要是為了防止接受程序先運行時出錯 def?callback(ch,?method,?properties,?body):??#?用于接收到消息后的回調(diào) ????print("?[x]?Received?%r"?%?body) channel.basic_consume(callback, ??????????????????????queue='hello',??#?收指定隊列hello的消息 ??????????????????????no_ack=True)??#在處理完消息后不發(fā)送ack給服務器 channel.start_consuming()??#?啟動消息接受?這會進入一個死循環(huán)
メッセージの配布方法はポーリングです。つまり、各ワーカー プロセスは同じ數(shù)のメッセージを取得します。
メッセージ確認
メッセージがワーカー プロセスに割り當てられているが、処理が完了する前にワーカー プロセスがクラッシュした場合、メッセージは失われる可能性があります。これは、rabbitmq がメッセージをワーカー プロセスに配布すると、メッセージが削除されるためです。
メッセージの損失を防ぐために、rabbitmq は ack を提供します。つまり、ワーカー プロセスがメッセージを受信して??処理した後、rabbitmq に ack を送信して、現(xiàn)時點でメッセージをキューから削除できることを Rabbitmq に通知します。ワーカー プロセスが停止し、rabbitmq が ack を受信しない場合、メッセージは他のワーカー プロセスに再配布されます。タイムアウトを設定する必要がなく、長時間かかるタスクでも処理できます。
ack はデフォルトで有効になっています。以前、ワーカー プロセスは ack:
channel.basic_publish(exchange='', ??????????????????routing_key='task_queue', ??????????????????body=message, ??????????????????properties=pika.BasicProperties( ?????????????????????delivery_mode?=?2,?#?使得消息持久化 ??????????????????))
メッセージ永続性
を使用して no_ack=True
channel.basic_consume(callback,?queue='hello')??#?會啟用ack
コールバックを指定していましたが、場合によっては RabbitMQ が再起動され、メッセージが失われます。永続性はキューの作成時に設定できます:
(キューの性質(zhì)は一度決定すると変更できません)def?callback(ch,?method,?properties,?body): ????print?"?[x]?Received?%r"?%?(body,) ????time.sleep(?body.count('.')?) ????print?"?[x]?Done" ????ch.basic_ack(delivery_tag?=?method.delivery_tag)??#?發(fā)送ack
同時に、メッセージの送信時にメッセージの永続性屬性も設定する必要があります:
channel.basic_publish (exchange='',
channel.queue_declare(queue='task_queue',?durable=True)
ただし、RabbitMQ がメッセージを受信したばかりで、それを保存する時間がなかった場合でも、メッセージは失われます。同時に、RabbitMQ は受信したすべてのメッセージを保存しません。より完全な保証が必要な場合は、発行者確認を使用する必要があります。
公平なメッセージ配信 ポーリング モードでのメッセージ配信は公平ではない可能性があります。たとえば、奇數(shù)のメッセージが重いタスクである場合、一部のプロセスは常に重いタスクを?qū)g行します。たとえば、特定のワーカー プロセスにバックログのメッセージがある場合でも、多くの ack が送信されませんが、RabbitMQ は引き続きメッセージを受信プロセスに追加できます:??????????????????routing_key="task_queue",
??????????????????body=message,
??????????????????properties=pika.BasicProperties(
?????????????????????delivery_mode?=?2,?#?make?message?persistent
??????????????????))
を RabbitMQ に通知します。ワーカー プロセスが ack を返さない場合、それ以上のメッセージは割り當てられないことを示します。 グループ 通常、メッセージは複數(shù)のプロセスに送信されてから完了することがあります。同時に: exchange 送信者はメッセージをキューに直接送信しますか? 実際、送信者はメッセージを交換に送信することしかできません。一方では、交換機はプロデューサーからメッセージを受信し、他方ではメッセージをキューにプッシュします。メッセージを受信したときに何をする必要があるか、メッセージを特別なメッセージに追加する必要があるかどうかを知る必要があります。 Exchange には、直接、トピック、ヘッダー、ファンアウトなどのタイプがあり、メッセージを発行するときに使用されるのはファンアウトです。これは、Exchange の値を意味します。デフォルトの Exchange を使用しますchannel.basic_qos(prefetch_count=1)
一時キュー
channel.exchange_declare(exchange='logs',?type='fanout')??#?該exchange會把消息發(fā)送給所有它知道的隊列中このように、result.method.queue はキュー名であり、 Exchange と queue
result?=?channel.queue_declare()??#?創(chuàng)建一個隨機隊列 result?=?channel.queue_declare(exclusive=True)??#?創(chuàng)建一個隨機隊列,同時在沒有接收者連接該隊列后則銷毀它 queue_name?=?result.method.queueのログをバインドしてコピーを送信します。メッセージを送信するときは、こんにちはメッセージを送信するときは、新しく作成されたログ交換を使用します
channel.queue_bind(exchange='logs', ???????????????queue='hello')ルーティング以前はバインドを使用して、交換とキューの関係を確立しました(キューは交換からのメッセージに関心があります) )、バインドするときに routing_key オプションを指定することもできます直接交換を使用して、ルーティング キーに対応するメッセージを同じルーティング キーにバインドされたキューに送信します
???channel.basic_publish(exchange='logs',
??????????????????routing_key='',
??????????????????body=message)
。異なる重大度のメッセージをパブリッシュする送信関數(shù): 。
channel.exchange_declare(exchange='direct_logs', ?????????????????????type='direct')受信関數(shù)で対応する重大度をバインドします:
channel.basic_publish(exchange='direct_logs', ??????????????????routing_key=severity, ??????????????????body=message)トピック交換を使用します前に使用した直接交換は 1 つのルーティング キーのみをバインドできます。これを使用して、ルーティング キーのトピック交換を開くことができます。例:
"stock.usd.nyse"?"nyse.vmw"
和direct exchange一樣,在接受者那邊綁定的key與發(fā)送時指定的routing key相同即可,另外有些特殊的值:
*?代表1個單詞 #?代表0個或多個單詞
如果發(fā)送者發(fā)出的routing key都是3個部分的,如:celerity.colour.species。
Q1: *.orange.*??對應的是中間的colour都為orange的 Q2: *.*.rabbit??對應的是最后部分的species為rabbit的 lazy.#??????對應的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,對于lazy.pink.rabbit雖然匹配到了Q2兩次,但是只會發(fā)送一次。如果綁定時直接綁定#,則會收到所有的。
RPC
在遠程機器上運行一個函數(shù)然后獲得結(jié)果。
1、客戶端啟動 同時設置一個臨時隊列用于接受回調(diào),綁定該隊列
????self.connection?=?pika.BlockingConnection(pika.ConnectionParameters( ????????????host='localhost')) ????self.channel?=?self.connection.channel() ????result?=?self.channel.queue_declare(exclusive=True) ????self.callback_queue?=?result.method.queue ????self.channel.basic_consume(self.on_response,?no_ack=True, ???????????????????????????????queue=self.callback_queue)
2、客戶端發(fā)送rpc請求,同時附帶reply_to對應回調(diào)隊列,correlation_id設置為每個請求的唯一id(雖然說可以為每一次RPC請求都創(chuàng)建一個回調(diào)隊列,但是這樣效率不高,如果一個客戶端只使用一個隊列,則需要使用correlation_id來匹配是哪個請求),之后阻塞在回調(diào)隊列直到收到回復
注意:如果收到了非法的correlation_id直接丟棄即可,因為有這種情況--服務器已經(jīng)發(fā)了響應但是還沒發(fā)ack就掛了,等一會服務器重啟了又會重新處理該任務,又發(fā)了一遍相應,但是這時那個請求已經(jīng)被處理掉了
channel.basic_publish(exchange='', ???????????????????????routing_key='rpc_queue', ???????????????????????properties=pika.BasicProperties( ?????????????????????????????reply_to?=?self.callback_queue, ?????????????????????????????correlation_id?=?self.corr_id, ?????????????????????????????), ???????????????????????body=str(n))??#?發(fā)出調(diào)用 while?self.response?is?None:??#?這邊就相當于阻塞了 ????self.connection.process_data_events()??#?查看回調(diào)隊列 return?int(self.response)
3、請求會發(fā)送到rpc_queue隊列
4、RPC服務器從rpc_queue中取出,執(zhí)行,發(fā)送回復
channel.basic_consume(on_request,?queue='rpc_queue')??#?綁定?等待請求 #?處理之后: ch.basic_publish(exchange='', ?????????????????routing_key=props.reply_to, ?????????????????properties=pika.BasicProperties(correlation_id?=?\ ?????????????????????????????????????????????????????props.correlation_id), ?????????????????body=str(response))??#?發(fā)送回復到回調(diào)隊列 ch.basic_ack(delivery_tag?=?method.delivery_tag)??#?發(fā)送ack
5、客戶端從回調(diào)隊列中取出數(shù)據(jù),檢查correlation_id,執(zhí)行相應操作
if?self.corr_id?==?props.correlation_id: ????????self.response?=?body
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
以上がRabbitMQ クイック スタート Python チュートリアルの詳細內(nèi)容です。詳細については、PHP 中國語 Web サイトの他の関連記事を參照してください。

ホットAIツール

Undress AI Tool
脫衣畫像を無料で

Undresser.AI Undress
リアルなヌード寫真を作成する AI 搭載アプリ

AI Clothes Remover
寫真から衣服を削除するオンライン AI ツール。

Clothoff.io
AI衣類リムーバー

Video Face Swap
完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中國語版
中國語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統(tǒng)合開発環(huán)境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ユーザー音聲入力がキャプチャされ、フロントエンドJavaScriptのMediareCorder APIを介してPHPバックエンドに送信されます。 2。PHPはオーディオを一時ファイルとして保存し、STTAPI(GoogleやBaiduの音聲認識など)を呼び出してテキストに変換します。 3。PHPは、テキストをAIサービス(Openaigptなど)に送信して、インテリジェントな返信を取得します。 4。PHPは、TTSAPI(BaiduやGoogle Voice Synthesisなど)を呼び出して音聲ファイルに返信します。 5。PHPは、音聲ファイルをフロントエンドに戻し、相互作用を完了します。プロセス全體は、すべてのリンク間のシームレスな接続を確保するためにPHPによって支配されています。

AIによるテキストエラーの修正と構(gòu)文最適化を?qū)g現(xiàn)するには、次の手順に従う必要があります。1。Baidu、Tencent API、またはオープンソースNLPライブラリなどの適切なAIモデルまたはAPIを選択します。 2。PHPのカールまたはガズルを介してAPIを呼び出し、返品結(jié)果を処理します。 3.アプリケーションにエラー修正情報を表示し、ユーザーが採用するかどうかを選択できるようにします。 4.構(gòu)文の検出とコードの最適化には、PHP-LとPHP_CODESNIFFERを使用します。 5.フィードバックを継続的に収集し、モデルまたはルールを更新して効果を改善します。 AIAPIを選択するときは、PHPの精度、応答速度、価格、サポートの評価に焦點を當てます。コードの最適化は、PSR仕様に従い、キャッシュを合理的に使用し、円形クエリを避け、定期的にコードを確認し、Xを使用する必要があります。

Seabornのジョイントプロットを使用して、2つの変數(shù)間の関係と分布をすばやく視覚化します。 2?;镜膜噬⒉紘恧?、sns.jointplot(data = tips、x = "total_bill"、y = "tip"、dind = "scatter")によって実裝され、中心は散布図であり、ヒストグラムは上部と右側(cè)と右側(cè)に表示されます。 3.回帰線と密度情報をdind = "reg"に追加し、marminal_kwsを組み合わせてエッジプロットスタイルを設定します。 4。データ量が大きい場合は、「ヘックス」を使用することをお勧めします。

AIセンチメントコンピューティングテクノロジーをPHPアプリケーションに統(tǒng)合するために、COREはセンチメント分析にクラウドサービスAIAPI(Google、AWS、Azureなど)を使用し、HTTPリクエストを介してテキストを送信し、JSON結(jié)果を返し、データベースに感情的なデータを保存し、それによって自動化された処理とユーザーフィードバックのデータ検査を?qū)g現(xiàn)することです。特定の手順には次のものが含まれます。1。正確性、コスト、言語サポート、統(tǒng)合の複雑さを考慮して、適切なAIセンチメント分析APIを選択します。 2。ガズルまたはカールを使用してリクエストを送信し、センチメントスコア、ラベル、および強度情報を保存します。 3。優(yōu)先順位の並べ替え、トレンド分析、製品の反復方向、ユーザーセグメンテーションをサポートする視覚的なダッシュボードを構(gòu)築します。 4。APIコールの制限や數(shù)などの技術的課題に対応する

文字列リストは、 '' .join(words)などのJoIn()メソッドとマージして、「Helloworldfrompython」を取得できます。 2。NUMBERリストは、參加する前にMAP(STR、數(shù)字)または[STR(x)forxinNumbers]を備えた文字列に変換する必要があります。 3.任意のタイプリストは、デバッグに適したブラケットと引用符のある文字列に直接変換できます。 4。カスタム形式は、 '|' .join(f "[{item}]" foriteminitems)output "[a] | [などのjoin()と組み合わせたジェネレーター式によって実裝できます。

Pyodbcのインストール:Pipinstallpyodbcコマンドを使用してライブラリをインストールします。 2.接続sqlserver:pyodbc.connect()メソッドを介して、ドライバー、サーバー、データベース、uid/pwdまたはtrusted_connectionを含む接続文字列を使用し、それぞれSQL認証またはWindows認証をサポートします。 3.インストールされているドライバーを確認します:pyodbc.drivers()を?qū)g行し、「sqlserver」を含むドライバー名をフィルタリングして、「sqlserverのodbcdriver17」などの正しいドライバー名が使用されるようにします。 4.接続文字列の重要なパラメーター

pandas.melt()は、幅広い形式データを長い形式に変換するために使用されます。答えは、ID_VARSを識別列を保持し、value_varsを溶かしてvar_nameおよびvalue_nameを選択する列を選択して、新しい列名を定義することです。列は1.id_vars = 'name'を意味します。 4.Value_Name = 'スコア'元の値の新しい列名を設定し、最後に名前、件名、スコアを含む3つの列を生成します。

Pythoncanbeoptimizedformemory-boundoperationsは、ヘッドゲネレーター、EfficientDataStructures、およびManagingObjectlifetimes.first、Usegeneratoratoratoratoratoratoraturatussを使用していることを確認してください
