博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq学习——队列
阅读量:6250 次
发布时间:2019-06-22

本文共 2511 字,大约阅读时间需要 8 分钟。

public class Send {

public static final String routingKey = "wuqidi_task_durable";
/*工作队列 也叫任务队列 目的是将任务发送到队列中 由工作者进行处理 在后台的多个工作者中 任务是共享的*/
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//消息持久化 :在声明队列过程中 第二个参数 是设置持久化 这是为了 当rabbitmq崩溃 关闭
//消息和队列是否删除 也就是再次开机会不会保留原来信息和队列
//这里设置为true是保证队列不会消失
channel.queueDeclare(routingKey, true, false, false, null);
String con = getTask();
System.out.println(con);
//这里设置的是保证消息不会消失 但也不是完全保证 可能会在内存中
channel.basicPublish("", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, con.getBytes());
channel.close();
connection.close();
}
/*在这里随机生成 task 个数*/
private static String getTask(){
Random r = new Random();
int len = r.nextInt(10);
StringBuffer sb = new StringBuffer();
for(int i=0; i< 15; i++){
sb.append("task ");
}
if(sb.length()<1){
sb.append("task");
}
return sb.substring(0, sb.length()-1).toString();
}
}

 

 

/*循环队列 使用工作队列的一个优点就是可以启动多个接收端 就是工作者,它可以并行工作。采用轮训的方式进行分配任务。*/

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(Send.routingKey, true, false, false, null);
//公平调度:
//在同一时刻 发给同一工作者不准超过1条任务,直到处理完消息作出相应。可以先发送给空闲的工作者。
channel.basicQos(1);
Consumer callback = new Consumer() {
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
}
@Override
public void handleRecoverOk(String consumerTag) {
}
@Override
public void handleDelivery(String arg0, Envelope arg1,
BasicProperties arg2, byte[] arg3) throws IOException {
String tasks = new String(arg3, "utf-8");
String[] taskss = tasks.split(" ");
for(String tem : taskss){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tem);
}
//这里执行消息确认 ,为的是待任务执行完毕后才能执行删除操作,如果任务执行过程中
//任务执行失败了 rabbitmq还可以链接其他工作者 进行工作
channel.basicAck(arg1.getDeliveryTag(), false);
}
@Override
public void handleConsumeOk(String consumerTag) {
}
@Override
public void handleCancelOk(String consumerTag) {
}
@Override
public void handleCancel(String consumerTag) throws IOException {
}
};
channel.basicConsume(Send.routingKey, false, callback);
channel.close();
connection.close();
}
}

在实际项目中目前我还没有用到。。。

转载于:https://www.cnblogs.com/core404/p/7644904.html

你可能感兴趣的文章
HDU_1398 Square Coins(生成函数)
查看>>
margin-left是做边距,是宽度 left是定位盒子左上角左边位置的一个点
查看>>
VBS基础篇 - 数据类型
查看>>
逃离CSDN
查看>>
47、删除vector中重复元素
查看>>
C# 多线程传参数
查看>>
DFS算法的实现
查看>>
初探React与D3的结合-或许是visualization的新突破?
查看>>
tcpdump抓包以及port查看的一些操作
查看>>
VTK GetScalarPointer() and GetScalarComponentAsFloat() not work
查看>>
&lt;Android&gt;从窗口泄漏谈android:configChanges属性
查看>>
MySQL运行原理与基础架构
查看>>
动物产生式识别系统
查看>>
python *args **kwargs
查看>>
Jquery UI - DatePicker 在Dialog中无法自动隐藏的解决思路
查看>>
Docker Swarm 让你事半功倍
查看>>
jQuery选择器之子元素过滤选择器Demo
查看>>
LogBoy运行截图
查看>>
string.Format字符串格式说明
查看>>
关于配置Tomcat的URIEncoding
查看>>