序
本文主要聊一下jesque的几个dao
dao列表
- FailureDAO
- KeysDAO
- QueueInfoDAO
- WorkerInfoDAO
FailureDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/FailureDAO.java
/** * FailureDAO provides access to job failures. * * @author Greg Haines */public interface FailureDAO { /** * @return total number of failures */ long getCount(); /** * @param offset offset into the failures * @param count number of failures to return * @return a sub-list of the failures */ ListgetFailures(long offset, long count); /** * Clear the list of failures. */ void clear(); /** * Re-queue a job for execution. * @param index the index into the failure list * @return the date the job was re-queued */ Date requeue(long index); /** * Remove a failure from the list. * @param index the index of the failure to remove */ void remove(long index);}
主要操纵的是namespace:failed,是一个list类型
- count
使用llen方法获取队列长度
- clear
使用del删除namespace:failed队列
- getFailures
使用lrange命令查询
- requeue
根据index取出failed job,重新设定retry时间,放到入队列中
- remove
根据index删,使用lrem,这里是先lset一个随机值,再根据这个随机值lrem
KeysDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/KeysDAO.java
/** * KeysDAO provides access to available keys. * * @author Greg Haines */public interface KeysDAO { /** * Get basic key info. * @param key the key name * @return the key information or null if the key did not exist */ KeyInfo getKeyInfo(String key); /** * Get basic key info plus a sub-list of the array value for the key, if applicable. * @param key the key name * @param offset the offset into the array * @param count the number of values to return * @return the key information or null if the key did not exist */ KeyInfo getKeyInfo(String key, int offset, int count); /** * Get basic info on all keys. * @return a list of key informations */ ListgetKeyInfos(); /** * @return information about the backing Redis database */ Map getRedisInfo();}
- getKeyInfo
使用type获取类型
- getKeyInfos
使用keys *方法
- getRedisInfo
使用info
QueueInfoDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/QueueInfoDAO.java
/** * QueueInfoDAO provides access to the queues in use by Jesque. * * @author Greg Haines */public interface QueueInfoDAO { /** * @return the list of queue names */ ListgetQueueNames(); /** * @return total number of jobs pending in all queues */ long getPendingCount(); /** * @return total number of jobs processed */ long getProcessedCount(); /** * @return the list of queue informations */ List getQueueInfos(); /** * @param name the queue name * @param jobOffset the offset into the queue * @param jobCount the number of jobs to return * @return the queue information or null if the queue does not exist */ QueueInfo getQueueInfo(String name, long jobOffset, long jobCount); /** * Delete the given queue. * @param name the name of the queue */ void removeQueue(String name);}
- getQueueNames
使用smembers方法操作namespace:queues
- getPendingCount
对每个queue计算大小,分queue类型
private long size(final Jedis jedis, final String queueName) { final String key = key(QUEUE, queueName); final long size; if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD size = jedis.zcard(key); } else { // Else, use LLEN size = jedis.llen(key); } return size; }
延时队列使用的是zcard操作SortSet 非延时队列使用llen操作list
- getProcessedCount
直接查询stat的string对象
- getQueueInfos
顺带计算每个queue的大小
- removeQueue
public void removeQueue(final String name) { PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) throws Exception { jedis.srem(key(QUEUES), name); jedis.del(key(QUEUE, name)); return null; } }); }
操作了queues以及queue两个对象
WorkerInfoDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/WorkerInfoDAO.java
/** * WorkerInfoDAO provides access to information about workers. * * @author Greg Haines */public interface WorkerInfoDAO { /** * @return total number of workers known */ long getWorkerCount(); /** * @return number of active workers */ long getActiveWorkerCount(); /** * @return number of paused workers */ long getPausedWorkerCount(); /** * @return information about all active workers */ ListgetActiveWorkers(); /** * @return information about all paused workers */ List getPausedWorkers(); /** * @return information about all workers */ List getAllWorkers(); /** * @param workerName the name of the worker * @return information about the given worker or null if that worker does not exist */ WorkerInfo getWorker(String workerName); /** * @return a map of worker informations by hostname */ Map > getWorkerHostMap(); /** * Removes the metadata about a worker. * * @param workerName * The worker name to remove */ void removeWorker(String workerName);}
- getAllWorkers
smembers操作namespace:workers
- getActiveWorkers
smembers操作namespace:workers,然后过来出来state是working的
- getPausedWorkers
smembers操作namespace:workers,然后过来出来state是paused的
- getWorkerCount
直接scard操作namespace:workers
- getActiveWorkerCount
smembers操作namespace:workers,然后过来出来state是working的
- getPausedWorkerCount
smembers操作namespace:workers,然后过来出来state是paused的
- getWorkerHostMap
smembers操作namespace:workers,然后按照host来分map 这个基本是万能的,其他的count基本是这个衍生出来
- removeWorker
public void removeWorker(final String workerName) { PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) throws Exception { jedis.srem(key(WORKERS), workerName); jedis.del(key(WORKER, workerName), key(WORKER, workerName, STARTED), key(STAT, FAILED, workerName), key(STAT, PROCESSED, workerName)); return null; } }); }
操作works以及其他相关的对象