public class Send {
public static final String routingKey = "wuqidi_task_durable"; /*工作队列 也叫任务队列 目的是将任务发送到队列中 由工作者进行处理 在后台的多个工作者中 任务是共享的*/ public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //消息持久化 :在声明队列过程中 第二个参数 是设置持久化 这是为了 当rabbitmq崩溃 关闭 //消息和队列是否删除 也就是再次开机会不会保留原来信息和队列 //这里设置为true是保证队列不会消失 channel.queueDeclare(routingKey, true, false, false, null); String con = getTask(); System.out.println(con); //这里设置的是保证消息不会消失 但也不是完全保证 可能会在内存中 channel.basicPublish("", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, con.getBytes()); channel.close(); connection.close(); } /*在这里随机生成 task 个数*/ private static String getTask(){ Random r = new Random(); int len = r.nextInt(10); StringBuffer sb = new StringBuffer(); for(int i=0; i< 15; i++){ sb.append("task "); } if(sb.length()<1){ sb.append("task"); } return sb.substring(0, sb.length()-1).toString(); }}
/*循环队列 使用工作队列的一个优点就是可以启动多个接收端 就是工作者,它可以并行工作。采用轮训的方式进行分配任务。*/
public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(Send.routingKey, true, false, false, null); //公平调度: //在同一时刻 发给同一工作者不准超过1条任务,直到处理完消息作出相应。可以先发送给空闲的工作者。 channel.basicQos(1); Consumer callback = new Consumer() { @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } @Override public void handleRecoverOk(String consumerTag) { } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { String tasks = new String(arg3, "utf-8"); String[] taskss = tasks.split(" "); for(String tem : taskss){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(tem); } //这里执行消息确认 ,为的是待任务执行完毕后才能执行删除操作,如果任务执行过程中 //任务执行失败了 rabbitmq还可以链接其他工作者 进行工作 channel.basicAck(arg1.getDeliveryTag(), false); } @Override public void handleConsumeOk(String consumerTag) { } @Override public void handleCancelOk(String consumerTag) { } @Override public void handleCancel(String consumerTag) throws IOException { } }; channel.basicConsume(Send.routingKey, false, callback); channel.close(); connection.close(); }}在实际项目中目前我还没有用到。。。