分布式队列
- 格式:doc
- 大小:198.50 KB
- 文档页数:8
rabbitmq应用实例RabbitMQ是一个流行的开源消息中间件,它可以帮助我们简化分布式系统之间的通信,并提高系统的可靠性。
以下是RabbitMQ的应用实例:1. 分布式任务队列在一个大型的分布式系统中,任务通常会被分配到多个节点进行并行处理。
在这种情况下,我们可以使用RabbitMQ来实现一个分布式的任务队列。
例如,一个电商网站可能需要在每天的晚上对所有商品的价格进行重新计算。
这个任务可能需要数小时才能完成,因此我们可以将这个任务分解成许多小的子任务,并将它们分配给多个节点进行处理。
节点可以使用RabbitMQ从任务队列中获取需要处理的消息,并将处理结果发送回队列中。
2. 日志收集器在一个分布式的系统中,各个节点可能会生成大量的日志文件。
为了方便管理和分析这些日志文件,我们可以使用RabbitMQ来实现一个日志收集器。
例如,一个在线电商网站可能需要追踪用户在网站上的行为。
这个过程会产生大量的日志文件,这些日志文件可能存储在不同的节点上。
我们可以使用RabbitMQ来收集这些日志文件,将它们发送到一个中央的日志处理节点,然后进行统一的处理和分析。
3. 消息推送在一个在线系统中,我们经常需要向用户发送推送通知。
我们可以使用RabbitMQ来实现一个消息推送的系统。
例如,一个在线聊天应用程序可能需要将用户之间的消息发送到相应的用户。
通过使用RabbitMQ,我们可以将消息发送到一个中央的消息队列中,然后从队列中获取消息并将其发送给相应的用户。
4. 事件驱动架构在一个分布式的系统中,我们经常需要使用事件来触发系统内的各个流程。
我们可以使用RabbitMQ来实现一个事件驱动的架构。
例如,在一个电商网站中,当一个用户下单时,我们可以使用RabbitMQ来发布一个订单事件。
这个事件可以触发其他部分的系统响应,例如库存管理系统可以更新库存,财务系统可以生成账单等。
总之,RabbitMQ是一个功能强大的消息中间件,可以帮助我们有效地组织分布式系统之间的通信。
分布式系统中的任务队列与任务调度随着互联网的快速发展,分布式系统在各个行业中被广泛应用。
为了实现高效的任务处理和资源利用,任务队列与任务调度成为分布式系统中不可或缺的组成部分。
本文将从任务队列和任务调度的概念入手,探讨它们在分布式系统中的作用和应用。
一、任务队列任务队列是一种存储和管理任务的数据结构,用于协调分布式系统中的任务处理。
它将任务按照时间顺序排列,保证任务的顺序性。
每个任务都包含了需要执行的操作和相关的参数。
任务队列可以分为同步队列和异步队列两种形式。
同步队列是一种简单的队列模式,它依次处理每个任务。
当一个任务执行完毕后,才会执行下一个任务。
同步队列在一些对任务执行顺序有严格要求的场景中很常见,比如电商平台的订单处理。
在该平台中,每个订单必须按照下单的先后顺序进行处理,否则可能会引发支付、库存等问题。
异步队列则是一种非阻塞的队列模式,它可以同时处理多个任务。
每个任务被放入队列后,无需等待前一个任务的完成,就可以继续处理下一个任务。
异步队列适用于任务处理时间较长,可以并发执行的情况,比如视频转码、大数据分析等。
二、任务调度任务调度是一种动态分配任务资源和管理任务执行的机制。
它根据任务的优先级、资源情况和系统负载等因素,合理地分配任务给不同的节点进行处理。
任务调度能够充分利用系统资源,提高处理效率和性能。
在分布式系统中,任务调度通常分为两个层次:中心调度和节点调度。
中心调度负责全局任务调度,根据各个节点的负载情况和任务优先级,将任务分配给不同的节点。
节点调度则负责局部任务调度,根据节点自身的资源情况和任务执行情况,合理地分配和管理任务的执行。
任务调度的算法有多种,比如最短作业优先调度、先来先服务调度、最早截止时间优先调度等。
这些算法以任务的属性和系统的状态为基础,综合考虑各个因素,确定合适的任务执行顺序和调度策略。
合理的任务调度能够提高系统的负载均衡和效率。
三、任务队列与任务调度的应用任务队列与任务调度在分布式系统中有广泛的应用。
zk应用场景
Zookeeper(zk)是一个高性能的分布式协调服务,它可以用于各种分布式场景,包括但不限于以下几个方面:
1. 协调分布式应用:在分布式系统中,各个节点之间需要协调工作,Zookeeper 提供了一套原子操作,来实现分布式应用的协调和同步。
2. 统一配置管理:分布式系统中,各个节点的配置信息需要保持一致。
Zookeeper 提供了注册中心功能,可以统一管理配置信息,降低维护成本和出错概率。
3. 分布式锁:在多线程、多进程甚至多机器环境下,锁的管理非常困难。
Zookeeper 提供了分布式锁的实现,可以保证分布式环境下的数据安全性和可靠性。
4. 集群监控:Zookeeper本身就是一个分布式集群,因此它可以用来监控其它集群的状态,并根据其变化做出相应的处理。
5. 分布式队列:在分布式系统中,消息队列的使用非常广泛。
Zookeeper可以用来实现分布式队列,支持生产者、消费者模型,保证消息的可靠性和顺序性。
总之,Zookeeper可以应用于任何需要协调分布式系统的场景,是构建分布式系统的不可或缺的基础组件之一。
分布式raid原理分布式RAID原理什么是RAIDRAID(冗余阵列磁盘)是一种通过将多个磁盘组合起来,提供更高性能、更高可靠性和更高存储容量的技术。
RAID技术使用了数据分发、数据镜像和奇偶校验等方法。
分布式RAID介绍分布式RAID是在传统RAID的基础上发展起来的一种技术。
它将数据存储在多个节点上,通过数据切片和冗余存储,提高了数据的可靠性和容错性。
分布式RAID原理分布式RAID的原理主要包括数据切片、数据重组和冗余存储。
数据切片(Data Slicing)分布式RAID将数据分成多个块,并将这些数据块分散存储在不同的节点上。
每个节点只存储部分数据,通过切片技术,确保数据的分布均匀且高效。
数据重组(Data Reconstruction)当读取数据时,分布式RAID需要将分散存储的数据块重新组合,以获取完整的数据。
数据重组需要协调多个节点之间的数据交换和通信。
冗余存储(Redundant Storage)为了保证数据的可靠性和容错性,分布式RAID采用冗余存储的方式。
在数据切片的基础上,还会增加冗余数据块,用于纠正数据块的错误。
分布式RAID的优势分布式RAID相比于传统RAID有以下优势:•高可靠性:分布式RAID将数据分散存储在多个节点上,即使其中某个节点发生故障,也不会影响整个系统的可用性。
•高容错性:分布式RAID通过冗余存储的方式,可以纠正数据块的错误,保证数据的完整性。
•高性能:分布式RAID可以充分利用多个节点的计算和存储资源,提供更高的性能和吞吐量。
•高扩展性:分布式RAID可以通过增加节点来扩展存储容量,提供更大的数据存储能力。
总结分布式RAID是一种在传统RAID基础上发展起来的技术,通过数据切片、数据重组和冗余存储,提高了数据的可靠性和容错性。
它具有高可靠性、高容错性、高性能和高扩展性等优势,逐渐成为大规模数据存储和处理的重要技术。
分布式RAID的应用场景分布式RAID广泛应用于以下场景:1.大规模数据中心:在大规模数据中心中,需要存储和处理大量的数据。
Celery-分布式任务队列⼀.Celery介绍和基本使⽤1.什么是Celery:是⼀个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要⽤到异步任务,就可以考虑使⽤celery,Celery在执⾏任务时需要通过⼀个消息中间件来接收和发送任务消息,以及存储任务结果,⼀般使⽤rabbitMQ or Redis 2.Celery有以下优点:(1)简单:⼀单熟悉了celery的⼯作流程后,配置和使⽤还是⽐较简单的(2)⾼可⽤:当任务执⾏失败或执⾏过程中发⽣连接中断,celery 会⾃动尝试重新执⾏任务(3)快速:⼀个单进程的celery每分钟可处理上百万个任务(4)灵活:⼏乎celery的各个组件都可以被扩展及⾃定制3.Celery基本⼯作流程(1)请求把任务给---celerv组件,他把任务给放到rabbitmq(broker中间商)同时⽣成⼀个任务id(2)worker(执⾏任务的节点)从rabbitmq(broker中间商)⾥取数据把执⾏好的任务放到rabbitmq(broker中间商)⾥⾯(3)请求再通过celerv组件去rabbitmq(broker中间商)⾥取结果4.Celery的基本使⽤(1)创建⼀个celery application⽤来定义你的任务列表#!/bin/env python3#_*_coding:utf-8_*_from celery import Celeryapp = Celery('tasks', #给app取得名字broker='redis://:123456@192.168.1.232', #连接rabbitmq(broker中间商)backend='redis://:123456@192.168.1.232'#把结果写到redis⾥)#加装饰器代表这是worker可以执⾏的⼀个任务@app.task#定义加法任务def add(x,y):print("running...",x,y)return x+y(2)启动Celery Worker来开始监听并执⾏任务命令:celery -A celery_test worker -l debug(3)开⼀个终端,调⽤任务>>> from celery_test import add>>> t = add.delay(18,18)Celery Worker⽇志打印:[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] running...[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] 18[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] 18(4)在执⾏>>> t.get()返回:36Celery Worker⽇志打印:[2019-06-17 12:34:09,509: INFO/ForkPoolWorker-1] Task celery_test.add[4b9e24ef-b148-4e81-a019-d9b28309b93d] succeeded in0.007139017805457115s: 36⼆.在项⽬中如何使⽤celery 1.可以把celery配置成⼀个应⽤⽬录格式如下:proj/__init__.pyproj/celery.py #celery的配置proj/tasks.py #任务1proj/tasks2.py #任务2(1)celery配置:from__future__import absolute_import, unicode_literals #from __future__ import absolute_import是从python包的绝对路径⾥取import我们安装的包⽽不是当前⽬录from celery import Celery #默认import当前⽬录的Celery,当前⽬录没有app = Celery('proj',broker='redis://:123456@192.168.1.232', #连上rabbitMQ的参数backend='redis://:123456@192.168.1.232',#连上rabbitMQ的参数include=['proj.tasks','proj.tasks2']) #定义proj⽬录下的tasks脚本和proj⽬录下的tasks2脚本#可以给app设置参数app.conf.update(result_expires=3600, #所有任务结果⼀个⼩时之内结果没被取⾛就没了)if__name__ == '__main__':app.start()(2)任务1:from__future__import absolute_import, unicode_literalsfrom .celery import app #导⼊当前⽬录下celery.py⾥的app,⽤装饰器#相加@app.taskdef add(x, y):return x + y(3)任务2:from__future__import absolute_import, unicode_literalsfrom .celery import app #导⼊当前⽬录下celery.py⾥的app,⽤装饰器#求和@app.taskdef xsum(numbers):return sum(numbers)(3)退出上层⽬录启动:celery -A proj worker -l debug(4)开⼀个终端,调⽤任务>>> from proj import tasks2,tasks>>> t2 = tasks2.xsum.delay([3,45,5,6,7,4,88,21,5])>>> t1 = tasks.add.delay(2,5)>>> t1.get()返回:7>>> t2.get()返回:1842.后台启动worker(1)启动1个w1任务:celery multi start w1 -A celery_django -l info返回:celery multi v4.3.0 (rhubarb)> Starting nodes...> w1@localhost: OK(2)启动1个w2任务:celery multi start w2 -A celery_django -l info返回:celery multi v4.3.0 (rhubarb)> Starting nodes...> w2@localhost: OK(3)停⽌w2任务:celery multi stop w2 -A proj -l info返回:celery multi v4.3.0 (rhubarb)> Stopping nodes...> w2@localhost: TERM -> 11221三.Celery定时任务celery⽀持定时任务,设定好任务的执⾏时间,celery就会定时⾃动帮你执⾏,这个定时任务模块叫celery beat1.通过计划任务实现每隔10秒,30秒,⼀周执⾏任务(1)celery配置:from__future__import absolute_import, unicode_literals #from __future__ import absolute_import是从python包的绝对路径⾥取import我们安装的包⽽不是当前⽬录from celery import Celery #默认import当前⽬录的Celery,当前⽬录没有app = Celery('proj',broker='redis://:123456@192.168.1.232', #连上rabbitMQ的参数backend='redis://:123456@192.168.1.232',#连上rabbitMQ的参数include=['proj.periodic_task']) #定义proj⽬录下的periodic_task脚本#可以给app设置参数app.conf.update(result_expires=3600, #所有任务结果⼀个⼩时之内结果没被取⾛就没了)if__name__ == '__main__':app.start()(2)定时任务:periodic_task.pyfrom__future__import absolute_import, unicode_literalsfrom celery.schedules import crontabfrom .celery import app#只要脚本⼀启动⽴刻执⾏这个函数,这个函数⾃动有两个参数sender(添加任务), **kwargs@app.on_after_configure.connectdef setup_periodic_tasks(sender, **kwargs):#每隔⼗秒钟执⾏test这个函数,.s是给test这个函数传的参数sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') #name='add every 10'任务名#每三⼗秒执⾏test这个函数,.s是给test这个函数传的参数sender.add_periodic_task(30.0, test.s('xixi'), expires=10) #expires=10任务结果保存10秒钟#每周⼀早上七点半执⾏test这个函数,.s是给test这个函数传的参数sender.add_periodic_task(crontab(hour=7, minute=30, day_of_week=1),test.s('Happy Mondays!'),)@app.taskdef test(arg):print(arg)(3)启动worker:celery -A proj worker -l debug(4)启动任务调度器:celery -A proj.periodic_task beat -l debug⼗秒返回:[2019-06-18 14:01:54,236: WARNING/ForkPoolWorker-3] hello[2019-06-18 14:01:54,243: INFO/ForkPoolWorker-3] Task proj.periodic_task.test[89b349b8-3c19-4968-a09b-214a72164cee] succeeded in 0.007367603946477175s: None[2019-06-18 14:02:14,192: INFO/MainProcess] Received task: proj.periodic_task.test[1803e491-5a41-4fe1-bfe2-18b8335ae7d5]三⼗秒返回:[2019-06-18 14:03:14,192: WARNING/ForkPoolWorker-3] xixi[2019-06-18 14:03:14,192: DEBUG/MainProcess] Task accepted: proj.periodic_task.test[8fe2e9f2-e07e-4f87-bd80-daa4d3c64605]pid:27865[2019-06-18 14:03:14,194: INFO/ForkPoolWorker-3] Task proj.periodic_task.test[8fe2e9f2-e07e-4f87-bd80-daa4d3c64605] succeeded in 0.0014079520478844643s: None[2019-06-18 14:03:34,196: INFO/MainProcess] Received task: proj.periodic_task.test[80b7fcfa-b1e6-4c11-8390-3e3a0cc68f4e]2.通过计划任务实现每五秒做⼀次相加操作(1)celery配置:celery.pyfrom__future__import absolute_import, unicode_literals #from __future__ import absolute_import是从python包的绝对路径⾥取import我们安装的包⽽不是当前⽬录from celery import Celery #默认import当前⽬录的Celery,当前⽬录没有app = Celery('proj',broker='redis://:123456@192.168.1.232', #连上rabbitMQ的参数backend='redis://:123456@192.168.1.232', #连上rabbitMQ的参数include=['proj.tasks','proj.periodic_task']) #定义proj⽬录下的periodic_task脚本#可以给app设置参数app.conf.update(result_expires=3600, #所有任务结果⼀个⼩时之内结果没被取⾛就没了)if__name__ == '__main__':app.start()(2)相加任务:tasks.pyfrom__future__import absolute_import, unicode_literalsfrom .celery import app #导⼊当前⽬录下celery.py⾥的app,⽤装饰器#相加@app.taskdef add(x, y):return x + y(3)计划任务:from__future__import absolute_import, unicode_literalsfrom celery.schedules import crontabfrom .celery import appapp.conf.beat_schedule = {'每隔五秒执⾏相加': {'task': 'proj.tasks.add','schedule': 5.0,'args': (10, 10)},}app.conf.timezone = 'UTC'@app.taskdef test(arg):print(arg)(4)启动worker:celery -A proj worker -l debug(5)启动任务调度器:celery -A proj.periodic_task beat -l debugworker输出:[2019-06-18 14:29:02,377: DEBUG/MainProcess] Task accepted: proj.tasks.add[03c85e0e-973c-4607-882f-44bb1951cd7a] pid:28339 [2019-06-18 14:29:02,378: INFO/ForkPoolWorker-3] Task proj.tasks.add[03c85e0e-973c-4607-882f-44bb1951cd7a] succeeded in 0.0011271699331700802s: 20任务调度器输出:[2019-06-18 14:28:22,392: INFO/MainProcess] Scheduler: Sending due task 每隔五秒执⾏相加 (proj.tasks.add)[2019-06-18 14:28:22,415: DEBUG/MainProcess] proj.tasks.add sent. id->083a5d92-f6ff-47ef-8bb8-be800a7edfa8四.Celery与django结合django可以轻松跟celery结合实现异步任务(1)在项⽬⽂件夹⾥创建celery_django\celery_django\celery.py#celery配置from__future__import absolute_import, unicode_literalsimport osfrom celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django.settings') #celery_django项⽬名字app = Celery('celery_task')app.config_from_object('django.conf:settings', namespace='CELERY') #必须以CELERY⼤写开头app.autodiscover_tasks() #可以⾃动刷新@app.task(bind=True)def debug_task(self): #print('Request: {0!r}'.format(self.request))(2)在项⽬⽂件夹celery_django\celery_django\__init__.py⾥写⼊from__future__import absolute_import, unicode_literalsfrom .celery import app as celery_app__all__ = ['celery_app'](3)配置⽂件写⼊:celery_django\celery_django\settings.py#连接redisCELERY_BROKER_URL = 'redis://:123456@192.168.1.232'CELERY_RESULT_BACKEND = 'redis://:123456@192.168.1.232'(4)总url写⼊:celery_django\celery_django\urls.pyfrom django.conf.urls import url,includefrom django.contrib import adminurlpatterns = [url(r'^admin/', admin.site.urls),url(r'^celery/', include('celery_task.urls')),(5)项⽬url写⼊:celery_django\celery_task\urls.pyfrom django.conf.urls import urlfrom celery_task import viewsurlpatterns = [url(r'^celery_test/', views.celery_test ), #获取任务idurl(r'^celery_res/', views.celery_res ), #通过任务id获取到执⾏结果](6)创建⼀个celery application⽤来定义你的任务列表:celery_django\celery_task\tasks.pyfrom__future__import absolute_import, unicode_literalsfrom celery import shared_task #import time@shared_taskdef add(x, y):print("running task add",x,y )time.sleep(10)return x + y(7)进⼊项⽬⽬录⾥启动Celery Worker来开始监听并执⾏任务命令:celery -A celery_django worker -l debug(8)调⽤任务:celery_django\celery_task\views.pyfrom__future__import absolute_import, unicode_literalsfrom celery import shared_task #import time@shared_taskdef add(x, y):print("running task add",x,y )time.sleep(10)return x + y(9)执⾏任务获取id和通过id取得执⾏的结果函数/celery/celery_django/celery_task/views.pyfrom django.shortcuts import HttpResponsefrom celery_task.tasks import addfrom celery.result import AsyncResult#执⾏任务获取id号def celery_test(request):task = add.delay(4,22)return HttpResponse(task.id) #返回id号#通过id好获取执⾏结果def celery_res(request):#获取id号task_id = '2edbcc88-12ad-4709-b770-d38179c67ef5'res = AsyncResult(id=task_id) #通过id号获取任务结果return HttpResponse(res.get()) #把结果返回给前端INSTALLED_APPS = ["django_celery_beat",]创建表存定时任务:python3 manage.py migrate。
分布式系统架构技术栈详解分布式系统架构是一种通过将系统的不同组件分布在不同的节点上来实现高可用性、可伸缩性和容错性的系统设计方法。
它是一种将任务分解成多个子任务,并通过网络进行通信和协作的系统架构。
在分布式系统架构中,技术栈是指用于构建和管理分布式系统的各种技术和工具的集合。
下面将介绍几个常用的技术栈。
1. 分布式存储技术:分布式存储技术是分布式系统中的核心技术之一。
它将数据分布到多个节点上,实现数据的高可用性和容错性。
常见的分布式存储技术包括分布式文件系统(如HDFS)、分布式数据库(如Cassandra和MongoDB)等。
2. 分布式计算技术:分布式计算技术用于将计算任务分布到多个节点上并进行并行计算。
常见的分布式计算技术包括MapReduce(如Hadoop)和Spark等。
这些技术通过将大规模的计算任务分解成多个小任务,并在多个节点上并行执行,从而实现高效的计算。
3. 分布式消息队列技术:分布式消息队列技术用于在分布式系统中实现异步通信和解耦。
它通过提供可靠的消息传递机制来实现系统间的解耦和异步通信。
常见的分布式消息队列技术包括Kafka和RabbitMQ等。
4. 分布式缓存技术:分布式缓存技术用于在分布式系统中提高数据访问性能。
它将数据缓存在多个节点上,以减轻数据库的负载和提高系统的响应速度。
常见的分布式缓存技术包括Redis和Memcached等。
5. 分布式服务框架技术:分布式服务框架技术用于实现分布式系统中的服务调用和管理。
它提供了服务注册、发现和负载均衡等功能,简化了分布式系统的开发和维护。
常见的分布式服务框架技术包括Dubbo和Spring Cloud等。
以上是几个常用的分布式系统架构技术栈。
在实际应用中,根据具体的需求和场景,还可以选择其他技术和工具来构建和管理分布式系统。
分布式系统架构的设计和实现是一个复杂而关键的任务,需要综合考虑系统的可靠性、性能和可扩展性等方面的需求。
MemcacheQRabbitMQAMQP 里主要要说两个组件:Exchange 和Queue (在AMQP 1.0 里还会有变动),如下图所示,绿色的X 就是Exchange ,红色的是Queue ,这两者都在Server 端,又称作Broker ,这部分是RabbitMQ 实现的,而蓝色的则是客户端,通常有Producer 和Consumer 两种类型:1、优点1)支持很多的协议:AMQP,XMPP, SMTP, STOMP2)消息发布与消费方式比较齐全消息队列中只要的两个角色是exchange和queue。
RabbitMQ中的Exchanger包括:directexchanger实现点对点传输;fanout exchanger实现一对多的广播传输;topic exchanger实现pub/sub传输。
3)有监控插件支持2、不足1)部署以及分布式支持比较复杂2)分布式的调用支持不是很好,负载均衡3)支持的协议的概念比较多,且基于Erlang实现。
4)在高并发的情况下,比较慢,内存占用较多RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。
BeanstalkdBeanstalk 是一个简单、快速的工作队列。
(轻量级)其接口是通用的,但其初衷是把一些耗时操作异步化,从而降低大访问量web应用页面访问的延迟。
Beanstalk is a simple, fast work queue.Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.一、协议Beanstalkd的协议类似memached。
二、核心概念1、Tube管道,用于job的存储。
Job的整个什么周期都是在管道中完成,管道由两个队列组成ready queue 和delay queue2、 Job任务相当于消息。
Job有4个状态 ready, reserved, delayed,buried三、优点1、轻量、高性能2、安装简单3、多种语言版本的客户端4、任务管理方便任务的各种状态,以及任务的定时处理(put 命令参数带delay),可以针对任务的各种状态进行管理。
四、缺点其分布式也由客户端来实现,java版本中客户端对于一个server没有提供类似memached客户端通过hash或其他算法来负载均衡。
多台机器的beanstalkd客户端连接不好控制显得不灵活。
五、个性需求控制任务的执行个数。
为了防止一些耗时操作的任务大量出队列(reserve),必须想办法控制任务的出队列执行。
思路:1、在pub任务时控制好任务的delay时间。
可以针对不同的tube来设定该tube的delay操作,如delay设置为任务执行时间的1/50,从而保证任务的有序执行。
这中方式控制不太灵活,如果在负载不大的情况下也进行delay,将发挥不了系统的一些性能。
2、控制任务的出队列(reserve)通过控制出队列数目来执行。
如,系统最多能承载100个此任务,可以通过设定出队列数来保证不要超过此数目。
根据stat tube信息cmd-reserve数目减去cmd-delete 数目即为尽在运行的任务数。
前提条件是在任务成功执行后才执行cmd-delete操作。
参考文章Beanstalkd 是最近出现的一个轻量级消息中间件,他的最大特点是将自己定位为基于管道 (tube) 和任务(job) 的工作队列(work-queue):“(Beanstalkd) is a simple, fast workqueue service. Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously.”Beanstalkd 支持任务优先级(priority), 延时(delay), 超时重发(time-to-run) 和预留(buried), 能够很好的支持分布式的后台任务和定时任务处理。
它的内部实现采用libevent, 服务器-客户端之间用类似memcached 的轻量级通讯协议,因此有很高的性能:尽管是内存队列, beanstalkd 提供了binlog 机制, 当重启beanstalkd 时,当前任务状态能够从纪录的本地binlog 中恢复。
管道(tube):管道类似于消息主题(topic), 在一个Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者(producer) 和消费者(consumer). 管道之间互相不影响。
任务(job):Beanstalkd 用任务(job) 代替消息(message) 的概念。
与消息不同,任务有一系列状态:READY - 需要立即处理的任务,当延时(DELAYED) 任务到期后会自动成为当前任务;DELAYED - 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回DELAYED 队列延迟执行;RESERVED - 已经被消费者获取, 正在执行的任务。
Beanstalkd 负责检查任务是否在TTR(time-to-run) 内完成;BURIED - 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;DELETED - 消息被彻底删除。
Beanstalkd 不再维持这些消息。
任务优先级(priority):任务(job) 可以有0~2^32 个优先级, 0 代表最高优先级。
beanstalkd 采用最大最小堆(Min-max heap) 处理任务优先级排序, 任何时刻调用reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为O(logn).延时任务(delay):有两种方式可以延时执行任务(job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行(RELEASE with <delay>)。
这种机制可以实现分布式的java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发(time-to-run) 能够保证任务转移到另外的节点执行。
任务超时重发(time-to-run):Beanstalkd 把任务返回给消费者以后:消费者必须在预设的TTR (time-to-run) 时间内发送delete / release/ bury 改变任务状态;否则Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。
如果消费者预计在TTR (time-to-run) 时间内无法完成任务, 也可以发送touch 命令, 它的作用是让Beanstalkd 从系统时间重新计算TTR (time-to-run).任务预留(buried):如果任务因为某些原因无法执行, 消费者可以把任务置为buried 状态让Beanstalkd 保留这些任务。
管理员可以通过peek buried 命令查询被保留的任务,并且进行人工干预。
简单的, kick <n> 能够一次性把n 条被保留的任务踢回队列。
Beanstalkd 协议:Beanstalkd 采用类memcached 协议, 客户端通过文本命令与服务器交互。
这些命令可以简单的分成三组:生产类- use <tube> / put <priority> <delay> <ttr> [bytes]:生产者用use 选择一个管道(tube), 然后用put 命令向管道发布任务(job).消费类- watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury<id> / touch <id>消费者用watch 选择多个管道(tube), 然后用reserve 命令获取待执行的任务,这个命令是阻塞的。
客户端直到有任务可执行才返回。
当任务处理完毕后, 消费者可以彻底删除任务(DELETE), 释放任务让别人处理(RELEASE), 或者保留(BURY) 任务。
维护类- peek job / peek delayed / peek ready / peek buried / kick <n>用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。
可以用消费类命令改变这些任务的状态。
被保留(buried) 的任务可以用kick 命令"踢" 回队列。
协议文档: https:///kr/beanstalkd/master/doc/protocol.txtBeanstalkd 不足:Beanstalkd 没有提供主备同步+ 故障切换机制, 在应用中有成为单点的风险。
实际应用中,可以用数据库为任务(job) 提供持久化存储。
另外, 和memcached 类似, Beanstalkd 依赖libevent 的单线程事件分发机制, 不能有效利用多核cpu 的性能。
这一点可以通过单机部署多个实例克服。
官方网站见/beanstalkd/参考资料:/past/2010/4/24/beanstalk_a_simple_and_fast_queueing_backen d//articles/about-this-blog-beanstalk-messaging-queue/presentations/Beanstalkd-2010-05-06//blog/cs/?p=1201。