View Javadoc

1   /**
2    *Copyright [2009-2010] [dennis zhuang(killme2008@gmail.com)]
3    *Licensed under the Apache License, Version 2.0 (the "License");
4    *you may not use this file except in compliance with the License.
5    *You may obtain a copy of the License at
6    *             http://www.apache.org/licenses/LICENSE-2.0
7    *Unless required by applicable law or agreed to in writing,
8    *software distributed under the License is distributed on an "AS IS" BASIS,
9    *WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
10   *either express or implied. See the License for the specific language governing permissions and limitations under the License
11   */
12  package net.rubyeye.xmemcached;
13  
14  import java.io.IOException;
15  import java.io.UnsupportedEncodingException;
16  import java.net.InetSocketAddress;
17  import java.net.URLDecoder;
18  import java.net.URLEncoder;
19  import java.util.ArrayList;
20  import java.util.Collection;
21  import java.util.Collections;
22  import java.util.HashMap;
23  import java.util.HashSet;
24  import java.util.Iterator;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.CopyOnWriteArrayList;
31  import java.util.concurrent.CountDownLatch;
32  import java.util.concurrent.ExecutionException;
33  import java.util.concurrent.Future;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.TimeoutException;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import net.rubyeye.xmemcached.auth.AuthInfo;
39  import net.rubyeye.xmemcached.buffer.BufferAllocator;
40  import net.rubyeye.xmemcached.buffer.SimpleBufferAllocator;
41  import net.rubyeye.xmemcached.codec.MemcachedCodecFactory;
42  import net.rubyeye.xmemcached.command.Command;
43  import net.rubyeye.xmemcached.command.CommandType;
44  import net.rubyeye.xmemcached.command.ServerAddressAware;
45  import net.rubyeye.xmemcached.command.TextCommandFactory;
46  import net.rubyeye.xmemcached.exception.MemcachedException;
47  import net.rubyeye.xmemcached.exception.NoValueException;
48  import net.rubyeye.xmemcached.impl.ArrayMemcachedSessionLocator;
49  import net.rubyeye.xmemcached.impl.ClosedMemcachedTCPSession;
50  import net.rubyeye.xmemcached.impl.DefaultKeyProvider;
51  import net.rubyeye.xmemcached.impl.KeyIteratorImpl;
52  import net.rubyeye.xmemcached.impl.MemcachedClientStateListenerAdapter;
53  import net.rubyeye.xmemcached.impl.MemcachedConnector;
54  import net.rubyeye.xmemcached.impl.MemcachedHandler;
55  import net.rubyeye.xmemcached.impl.MemcachedTCPSession;
56  import net.rubyeye.xmemcached.impl.ReconnectRequest;
57  import net.rubyeye.xmemcached.monitor.Constants;
58  import net.rubyeye.xmemcached.monitor.MemcachedClientNameHolder;
59  import net.rubyeye.xmemcached.monitor.XMemcachedMbeanServer;
60  import net.rubyeye.xmemcached.networking.Connector;
61  import net.rubyeye.xmemcached.networking.MemcachedSession;
62  import net.rubyeye.xmemcached.transcoders.CachedData;
63  import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
64  import net.rubyeye.xmemcached.transcoders.Transcoder;
65  import net.rubyeye.xmemcached.utils.AddrUtil;
66  import net.rubyeye.xmemcached.utils.ByteUtils;
67  import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
68  import net.rubyeye.xmemcached.utils.Protocol;
69  
70  import org.slf4j.Logger;
71  import org.slf4j.LoggerFactory;
72  
73  import com.google.code.yanf4j.config.Configuration;
74  import com.google.code.yanf4j.core.Session;
75  import com.google.code.yanf4j.core.SocketOption;
76  import com.google.code.yanf4j.util.SystemUtils;
77  
78  /**
79   * Memcached Client for connecting to memcached server and do operations.
80   * 
81   * @author dennis(killme2008@gmail.com)
82   * 
83   */
84  public class XMemcachedClient implements XMemcachedClientMBean, MemcachedClient {
85  
86  	private static final Logger log = LoggerFactory
87  			.getLogger(XMemcachedClient.class);
88  	protected MemcachedSessionLocator sessionLocator;
89  	private volatile boolean shutdown;
90  	protected MemcachedConnector connector;
91  	@SuppressWarnings("unchecked")
92  	private Transcoder transcoder;
93  	private boolean sanitizeKeys;
94  	private MemcachedHandler memcachedHandler;
95  	protected CommandFactory commandFactory;
96  	private long opTimeout = DEFAULT_OP_TIMEOUT;
97  	private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
98  	protected int connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE;
99  	protected int maxQueuedNoReplyOperations = DEFAULT_MAX_QUEUED_NOPS;
100 
101 	protected final AtomicInteger serverOrderCount = new AtomicInteger();
102 
103 	private Map<InetSocketAddress, AuthInfo> authInfoMap = new HashMap<InetSocketAddress, AuthInfo>();
104 
105 	private String name; // cache name
106 
107 	private volatile boolean failureMode;
108 
109 	private int timeoutExceptionThreshold = DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD;
110 
111 	private final CopyOnWriteArrayList<MemcachedClientStateListenerAdapter> stateListenerAdapters = new CopyOnWriteArrayList<MemcachedClientStateListenerAdapter>();
112 	private Thread shutdownHookThread;
113 	private volatile boolean isHutdownHookCalled = false;
114 	// key provider for pre-processing keys before sending them to memcached
115 	// added by dennis,2012-07-14
116 	private KeyProvider keyProvider = DefaultKeyProvider.INSTANCE;
117 	/**
118 	 * namespace thread local.
119 	 */
120 	public static final ThreadLocal<String> NAMESPACE_LOCAL = new ThreadLocal<String>();
121 
122 	/*
123 	 * (non-Javadoc)
124 	 * 
125 	 * @see net.rubyeye.xmemcached.MemcachedClient#setMergeFactor(int)
126 	 */
127 	public final void setMergeFactor(final int mergeFactor) {
128 		if (mergeFactor < 0) {
129 			throw new IllegalArgumentException("mergeFactor<0");
130 		}
131 		this.connector.setMergeFactor(mergeFactor);
132 	}
133 
134 	public int getTimeoutExceptionThreshold() {
135 		return this.timeoutExceptionThreshold;
136 	}
137 
138 	public void setTimeoutExceptionThreshold(int timeoutExceptionThreshold) {
139 		if (timeoutExceptionThreshold <= 0) {
140 			throw new IllegalArgumentException(
141 					"Illegal timeoutExceptionThreshold value "
142 							+ timeoutExceptionThreshold);
143 		}
144 		if (timeoutExceptionThreshold < 100) {
145 			log.warn("Too small timeoutExceptionThreshold value may cause connections disconnect/reconnect frequently.");
146 		}
147 		this.timeoutExceptionThreshold = timeoutExceptionThreshold;
148 	}
149 
150 	public <T> T withNamespace(String ns, MemcachedClientCallable<T> callable)
151 			throws MemcachedException, InterruptedException, TimeoutException {
152 		this.beginWithNamespace(ns);
153 		try {
154 			return callable.call(this);
155 		} finally {
156 			this.endWithNamespace();
157 		}
158 	}
159 
160 	public void endWithNamespace() {
161 		NAMESPACE_LOCAL.remove();
162 	}
163 
164 	public void beginWithNamespace(String ns) {
165 		if (ns == null || ns.trim().length() == 0) {
166 			throw new IllegalArgumentException("Blank namespace");
167 		}
168 		if (NAMESPACE_LOCAL.get() != null) {
169 			throw new IllegalStateException("Previous namespace wasn't ended.");
170 		}
171 		NAMESPACE_LOCAL.set(ns);
172 	}
173 
174 	public KeyProvider getKeyProvider() {
175 		return this.keyProvider;
176 	}
177 
178 	public void setKeyProvider(KeyProvider keyProvider) {
179 		if (keyProvider == null) {
180 			throw new IllegalArgumentException("Null key provider");
181 		}
182 		this.keyProvider = keyProvider;
183 	}
184 
185 	public final MemcachedSessionLocator getSessionLocator() {
186 		return this.sessionLocator;
187 	}
188 
189 	public final CommandFactory getCommandFactory() {
190 		return this.commandFactory;
191 	}
192 
193 	public String getName() {
194 		return this.name;
195 	}
196 
197 	public void setName(String name) {
198 		this.name = name;
199 	}
200 
201 	/*
202 	 * (non-Javadoc)
203 	 * 
204 	 * @see net.rubyeye.xmemcached.MemcachedClient#getConnectTimeout()
205 	 */
206 	public long getConnectTimeout() {
207 		return this.connectTimeout;
208 	}
209 
210 	/*
211 	 * (non-Javadoc)
212 	 * 
213 	 * @see net.rubyeye.xmemcached.MemcachedClient#setConnectTimeout(long)
214 	 */
215 	public void setConnectTimeout(long connectTimeout) {
216 		if (connectTimeout < 0) {
217 			throw new IllegalArgumentException("connectTimeout<0");
218 		}
219 		this.connectTimeout = connectTimeout;
220 	}
221 
222 	public void setEnableHeartBeat(boolean enableHeartBeat) {
223 		this.memcachedHandler.setEnableHeartBeat(enableHeartBeat);
224 	}
225 
226 	/**
227 	 * get operation timeout setting
228 	 * 
229 	 * @return
230 	 */
231 	public final long getOpTimeout() {
232 		return this.opTimeout;
233 	}
234 
235 	/**
236 	 * set operation timeout,default is one second.
237 	 * 
238 	 * @param opTimeout
239 	 */
240 	public final void setOpTimeout(long opTimeout) {
241 		if (opTimeout < 0) {
242 			throw new IllegalArgumentException("opTimeout<0");
243 		}
244 		this.opTimeout = opTimeout;
245 	}
246 
247 	public void setHealSessionInterval(long healConnectionInterval) {
248 		if (healConnectionInterval <= 0) {
249 			throw new IllegalArgumentException("Invalid heal session interval:"
250 					+ healConnectionInterval);
251 		}
252 		if (null != this.connector) {
253 			this.connector.setHealSessionInterval(healConnectionInterval);
254 		} else {
255 			throw new IllegalStateException("The client hasn't been started");
256 		}
257 	}
258 
259 	public long getHealSessionInterval() {
260 		if (null != this.connector) {
261 			return this.connector.getHealSessionInterval();
262 		}
263 		return -1L;
264 	}
265 
266 	public Map<InetSocketAddress, AuthInfo> getAuthInfoMap() {
267 		return this.authInfoMap;
268 	}
269 
270 	public void setAuthInfoMap(Map<InetSocketAddress, AuthInfo> map) {
271 		this.authInfoMap = map;
272 	}
273 
274 	/*
275 	 * (non-Javadoc)
276 	 * 
277 	 * @see net.rubyeye.xmemcached.MemcachedClient#getConnector()
278 	 */
279 	public final Connector getConnector() {
280 		return this.connector;
281 	}
282 
283 	/*
284 	 * (non-Javadoc)
285 	 * 
286 	 * @see
287 	 * net.rubyeye.xmemcached.MemcachedClient#setOptimizeMergeBuffer(boolean)
288 	 */
289 	public final void setOptimizeMergeBuffer(final boolean optimizeMergeBuffer) {
290 		this.connector.setOptimizeMergeBuffer(optimizeMergeBuffer);
291 	}
292 
293 	/*
294 	 * (non-Javadoc)
295 	 * 
296 	 * @see net.rubyeye.xmemcached.MemcachedClient#isShutdown()
297 	 */
298 	public final boolean isShutdown() {
299 		return this.shutdown;
300 	}
301 
302 	@SuppressWarnings("unchecked")
303 	private final <T> GetsResponse<T> gets0(final String key,
304 			final byte[] keyBytes, final Transcoder<T> transcoder)
305 			throws MemcachedException, TimeoutException, InterruptedException {
306 		GetsResponse<T> result = (GetsResponse<T>) this.fetch0(key, keyBytes,
307 				CommandType.GETS_ONE, this.opTimeout, transcoder);
308 		return result;
309 	}
310 
311 	private final Session sendCommand(final Command cmd)
312 			throws MemcachedException {
313 		if (this.shutdown) {
314 			throw new MemcachedException("Xmemcached is stopped");
315 		}
316 		return this.connector.send(cmd);
317 	}
318 
319 	/**
320 	 * XMemcached constructor,default weight is 1
321 	 * 
322 	 * @param server
323 	 *            �����P
324 	 * @param port
325 	 *            ����ㄧ���
326 	 * @throws IOException
327 	 */
328 	public XMemcachedClient(final String server, final int port)
329 			throws IOException {
330 		this(server, port, 1);
331 	}
332 
333 	/**
334 	 * XMemcached constructor
335 	 * 
336 	 * @param host
337 	 *            server host
338 	 * @param port
339 	 *            server port
340 	 * @param weight
341 	 *            server weight
342 	 * @throws IOException
343 	 */
344 	public XMemcachedClient(final String host, final int port, int weight)
345 			throws IOException {
346 		super();
347 		if (weight <= 0) {
348 			throw new IllegalArgumentException("weight<=0");
349 		}
350 		this.checkServerPort(host, port);
351 		this.buildConnector(new ArrayMemcachedSessionLocator(),
352 				new SimpleBufferAllocator(),
353 				XMemcachedClientBuilder.getDefaultConfiguration(),
354 				XMemcachedClientBuilder.getDefaultSocketOptions(),
355 				new TextCommandFactory(), new SerializingTranscoder());
356 		this.start0();
357 		this.connect(new InetSocketAddressWrapper(this.newSocketAddress(host,
358 				port), this.serverOrderCount.incrementAndGet(), weight, null));
359 	}
360 
361 	protected InetSocketAddress newSocketAddress(final String server,
362 			final int port) {
363 		return new InetSocketAddress(server, port);
364 	}
365 
366 	private void checkServerPort(String server, int port) {
367 		if (server == null || server.length() == 0) {
368 			throw new IllegalArgumentException();
369 		}
370 		if (port <= 0) {
371 			throw new IllegalArgumentException();
372 		}
373 	}
374 
375 	/*
376 	 * (non-Javadoc)
377 	 * 
378 	 * @see net.rubyeye.xmemcached.MemcachedClient#addServer(java.lang.String,
379 	 * int)
380 	 */
381 	public final void addServer(final String server, final int port)
382 			throws IOException {
383 		this.addServer(server, port, 1);
384 	}
385 
386 	/**
387 	 * add a memcached server to MemcachedClient
388 	 * 
389 	 * @param server
390 	 * @param port
391 	 * @param weight
392 	 * @throws IOException
393 	 */
394 	public final void addServer(final String server, final int port, int weight)
395 			throws IOException {
396 		if (weight <= 0) {
397 			throw new IllegalArgumentException("weight<=0");
398 		}
399 		this.checkServerPort(server, port);
400 		this.connect(new InetSocketAddressWrapper(this.newSocketAddress(server,
401 				port), this.serverOrderCount.incrementAndGet(), weight, null));
402 	}
403 
404 	/*
405 	 * (non-Javadoc)
406 	 * 
407 	 * @see
408 	 * net.rubyeye.xmemcached.MemcachedClient#addServer(java.net.InetSocketAddress
409 	 * )
410 	 */
411 	public final void addServer(final InetSocketAddress inetSocketAddress)
412 			throws IOException {
413 		this.addServer(inetSocketAddress, 1);
414 	}
415 
416 	public final void addServer(final InetSocketAddress inetSocketAddress,
417 			int weight) throws IOException {
418 		if (inetSocketAddress == null) {
419 			throw new IllegalArgumentException("Null InetSocketAddress");
420 		}
421 		if (weight <= 0) {
422 			throw new IllegalArgumentException("weight<=0");
423 		}
424 		this.connect(new InetSocketAddressWrapper(inetSocketAddress,
425 				this.serverOrderCount.incrementAndGet(), weight, null));
426 	}
427 
428 	/*
429 	 * (non-Javadoc)
430 	 * 
431 	 * @see net.rubyeye.xmemcached.MemcachedClient#addServer(java.lang.String)
432 	 */
433 	public final void addServer(String hostList) throws IOException {
434 		Map<InetSocketAddress, InetSocketAddress> addresses = AddrUtil
435 				.getAddressMap(hostList);
436 		if (addresses != null && addresses.size() > 0) {
437 			for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addresses
438 					.entrySet()) {
439 				final InetSocketAddress mainNodeAddr = entry.getKey();
440 				final InetSocketAddress standbyNodeAddr = entry.getValue();
441 				this.connect(new InetSocketAddressWrapper(mainNodeAddr,
442 						this.serverOrderCount.incrementAndGet(), 1, null));
443 				if (standbyNodeAddr != null) {
444 					this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
445 							this.serverOrderCount.incrementAndGet(), 1,
446 							mainNodeAddr));
447 				}
448 			}
449 		}
450 	}
451 
452 	public void addOneServerWithWeight(String server, int weight)
453 			throws IOException {
454 		Map<InetSocketAddress, InetSocketAddress> addresses = AddrUtil
455 				.getAddressMap(server);
456 		if (addresses == null) {
457 			throw new IllegalArgumentException("Null Server");
458 		}
459 		if (addresses.size() != 1) {
460 			throw new IllegalArgumentException(
461 					"Please add one server at one time");
462 		}
463 		if (weight <= 0) {
464 			throw new IllegalArgumentException("weight<=0");
465 		}
466 		if (addresses != null && addresses.size() > 0) {
467 			for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addresses
468 					.entrySet()) {
469 				final InetSocketAddress mainNodeAddr = entry.getKey();
470 				final InetSocketAddress standbyNodeAddr = entry.getValue();
471 				this.connect(new InetSocketAddressWrapper(mainNodeAddr,
472 						this.serverOrderCount.incrementAndGet(), 1, null));
473 				if (standbyNodeAddr != null) {
474 					this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
475 							this.serverOrderCount.incrementAndGet(), 1,
476 							mainNodeAddr));
477 				}
478 			}
479 		}
480 
481 	}
482 
483 	/*
484 	 * (non-Javadoc)
485 	 * 
486 	 * @see net.rubyeye.xmemcached.MemcachedClient#getServersDescription()
487 	 */
488 	public final List<String> getServersDescription() {
489 		final List<String> result = new ArrayList<String>();
490 		for (Session session : this.connector.getSessionSet()) {
491 			InetSocketAddress socketAddress = session.getRemoteSocketAddress();
492 			int weight = ((MemcachedSession) session)
493 					.getInetSocketAddressWrapper().getWeight();
494 			result.add(SystemUtils.getRawAddress(socketAddress) + ":"
495 					+ socketAddress.getPort() + "(weight=" + weight + ")");
496 		}
497 		return result;
498 	}
499 
500 	public final void setServerWeight(String server, int weight) {
501 		InetSocketAddress socketAddress = AddrUtil.getOneAddress(server);
502 		Queue<Session> sessionQueue = this.connector
503 				.getSessionByAddress(socketAddress);
504 		if (sessionQueue == null) {
505 			throw new IllegalArgumentException("There is no server " + server);
506 		}
507 		for (Session session : sessionQueue) {
508 			if (session != null) {
509 				((MemcachedTCPSession) session).getInetSocketAddressWrapper()
510 						.setWeight(weight);
511 			}
512 		}
513 		this.connector.updateSessions();
514 	}
515 
516 	/*
517 	 * (non-Javadoc)
518 	 * 
519 	 * @see
520 	 * net.rubyeye.xmemcached.MemcachedClient#removeServer(java.lang.String)
521 	 */
522 	public final void removeServer(String hostList) {
523 		List<InetSocketAddress> addresses = AddrUtil.getAddresses(hostList);
524 		if (addresses != null && addresses.size() > 0) {
525 			for (InetSocketAddress address : addresses) {
526 				// Close main sessions
527 				Queue<Session> sessionQueue = this.connector
528 						.getSessionByAddress(address);
529 				if (sessionQueue != null) {
530 					for (Session session : sessionQueue) {
531 						if (session != null) {
532 							// Disable auto reconnection
533 							((MemcachedSession) session)
534 									.setAllowReconnect(false);
535 							// Close connection
536 							((MemcachedSession) session).quit();
537 						}
538 					}
539 				}
540 				// Close standby sessions
541 				List<Session> standBySession = this.connector
542 						.getStandbySessionListByMainNodeAddr(address);
543 				if (standBySession != null) {
544 					for (Session session : standBySession) {
545 						this.connector.removeReconnectRequest(session
546 								.getRemoteSocketAddress());
547 						if (session != null) {
548 							// Disable auto reconnection
549 							((MemcachedSession) session)
550 									.setAllowReconnect(false);
551 							// Close connection
552 							((MemcachedSession) session).quit();
553 						}
554 					}
555 				}
556 				this.connector.removeReconnectRequest(address);
557 			}
558 
559 		}
560 
561 	}
562 
563 	protected void checkSocketAddress(InetSocketAddress address) {
564 
565 	}
566 
567 	private void connect(final InetSocketAddressWrapper inetSocketAddressWrapper)
568 			throws IOException {
569 		// creat connection pool
570 		InetSocketAddress inetSocketAddress = inetSocketAddressWrapper
571 				.getInetSocketAddress();
572 		this.checkSocketAddress(inetSocketAddress);
573 		if (this.connectionPoolSize > 1) {
574 			log.warn("You are using connection pool for xmemcached client,it's not recommended unless you have test it that it can boost performance in your app.");
575 		}
576 		for (int i = 0; i < this.connectionPoolSize; i++) {
577 			Future<Boolean> future = null;
578 			boolean connected = false;
579 			Throwable throwable = null;
580 			try {
581 				future = this.connector.connect(inetSocketAddressWrapper);
582 
583 				if (!future.get(this.connectTimeout, TimeUnit.MILLISECONDS)) {
584 					log.error("connect to "
585 							+ SystemUtils.getRawAddress(inetSocketAddress)
586 							+ ":" + inetSocketAddress.getPort() + " fail");
587 				} else {
588 					connected = true;
589 				}
590 			} catch (InterruptedException e) {
591 				Thread.currentThread().interrupt();
592 			} catch (ExecutionException e) {
593 				throwable = e;
594 				log.error(
595 						"connect to "
596 								+ SystemUtils.getRawAddress(inetSocketAddress)
597 								+ ":" + inetSocketAddress.getPort() + " error",
598 						e);
599 			} catch (TimeoutException e) {
600 				throwable = e;
601 				log.error(
602 						"connect to "
603 								+ SystemUtils.getRawAddress(inetSocketAddress)
604 								+ ":" + inetSocketAddress.getPort()
605 								+ " timeout", e);
606 			} catch (Exception e) {
607 				throwable = e;
608 				log.error(
609 						"connect to "
610 								+ SystemUtils.getRawAddress(inetSocketAddress)
611 								+ ":" + inetSocketAddress.getPort() + " error",
612 						e);
613 			}
614 			// If it is not connected,it will be added to waiting queue for
615 			// reconnecting.
616 			if (!connected) {
617 				if (future != null) {
618 					future.cancel(true);
619 				}
620 				// If we use failure mode, add a mock session at first
621 				if (this.failureMode) {
622 					this.connector.addSession(new ClosedMemcachedTCPSession(
623 							inetSocketAddressWrapper));
624 				}
625 				this.connector.addToWatingQueue(new ReconnectRequest(
626 						inetSocketAddressWrapper, 0, this
627 								.getHealSessionInterval()));
628 				log.error(
629 						"Connect to "
630 								+ SystemUtils.getRawAddress(inetSocketAddress)
631 								+ ":" + inetSocketAddress.getPort() + " fail",
632 						throwable);
633 				// throw new IOException(throwable);
634 			}
635 		}
636 	}
637 
638 	@SuppressWarnings("unchecked")
639 	private final <T> Object fetch0(final String key, final byte[] keyBytes,
640 			final CommandType cmdType, final long timeout,
641 			Transcoder<T> transcoder) throws InterruptedException,
642 			TimeoutException, MemcachedException, MemcachedException {
643 		final Command command = this.commandFactory.createGetCommand(key,
644 				keyBytes, cmdType, this.transcoder);
645 		this.latchWait(command, timeout, this.sendCommand(command));
646 		command.getIoBuffer().free(); // free buffer
647 		this.checkException(command);
648 		CachedData data = (CachedData) command.getResult();
649 		if (data == null) {
650 			return null;
651 		}
652 		if (transcoder == null) {
653 			transcoder = this.transcoder;
654 		}
655 		if (cmdType == CommandType.GETS_ONE) {
656 			return new GetsResponse<T>(data.getCas(), transcoder.decode(data));
657 		} else {
658 			return transcoder.decode(data);
659 		}
660 	}
661 
662 	private final void start0() throws IOException {
663 		this.registerMBean();
664 		this.startConnector();
665 		MemcachedClientNameHolder.clear();
666 	}
667 
668 	private final void startConnector() throws IOException {
669 		if (this.shutdown) {
670 			this.shutdown = false;
671 			this.connector.start();
672 			this.memcachedHandler.start();
673 			this.shutdownHookThread = new Thread() {
674 				@Override
675 				public void run() {
676 					try {
677 						XMemcachedClient.this.isHutdownHookCalled = true;
678 						XMemcachedClient.this.shutdown();
679 					} catch (IOException e) {
680 						log.error("Shutdown XMemcachedClient error", e);
681 					}
682 				}
683 			};
684 			Runtime.getRuntime().addShutdownHook(this.shutdownHookThread);
685 		}
686 	}
687 
688 	/**
689 	 * Set max queued noreply operations number
690 	 * 
691 	 * @param maxQueuedNoReplyOperations
692 	 */
693 	void setMaxQueuedNoReplyOperations(int maxQueuedNoReplyOperations) {
694 		if (maxQueuedNoReplyOperations <= 1) {
695 			throw new IllegalArgumentException("maxQueuedNoReplyOperations<=1");
696 		}
697 		this.maxQueuedNoReplyOperations = maxQueuedNoReplyOperations;
698 	}
699 
700 	@SuppressWarnings("unchecked")
701 	private void buildConnector(MemcachedSessionLocator locator,
702 			BufferAllocator bufferAllocator, Configuration configuration,
703 			Map<SocketOption, Object> socketOptions,
704 			CommandFactory commandFactory, Transcoder transcoder) {
705 		if (locator == null) {
706 			locator = new ArrayMemcachedSessionLocator();
707 
708 		}
709 		if (bufferAllocator == null) {
710 			bufferAllocator = new SimpleBufferAllocator();
711 		}
712 		if (configuration == null) {
713 			configuration = XMemcachedClientBuilder.getDefaultConfiguration();
714 		}
715 		if (transcoder == null) {
716 			transcoder = new SerializingTranscoder();
717 		}
718 		if (commandFactory == null) {
719 			commandFactory = new TextCommandFactory();
720 		}
721 		if (this.name == null) {
722 			this.name = "MemcachedClient-"
723 					+ Constants.MEMCACHED_CLIENT_COUNTER.getAndIncrement();
724 			MemcachedClientNameHolder.setName(this.name);
725 		}
726 		this.commandFactory = commandFactory;
727 		ByteUtils.setProtocol(this.commandFactory.getProtocol());
728 		log.warn("XMemcachedClient is using "
729 				+ this.commandFactory.getProtocol().name() + " protocol");
730 		this.commandFactory.setBufferAllocator(bufferAllocator);
731 		this.shutdown = true;
732 		this.transcoder = transcoder;
733 		this.sessionLocator = locator;
734 		this.connector = this.newConnector(bufferAllocator, configuration,
735 				this.sessionLocator, this.commandFactory,
736 				this.connectionPoolSize, this.maxQueuedNoReplyOperations);
737 		this.memcachedHandler = new MemcachedHandler(this);
738 		this.connector.setHandler(this.memcachedHandler);
739 		this.connector.setCodecFactory(new MemcachedCodecFactory());
740 		this.connector.setSessionTimeout(-1);
741 		this.connector.setSocketOptions(socketOptions);
742 		if (this.isFailureMode()) {
743 			log.warn("XMemcachedClient in failure mode.");
744 		}
745 		this.connector.setFailureMode(this.failureMode);
746 		this.sessionLocator.setFailureMode(this.failureMode);
747 	}
748 
749 	protected MemcachedConnector newConnector(BufferAllocator bufferAllocator,
750 			Configuration configuration,
751 			MemcachedSessionLocator memcachedSessionLocator,
752 			CommandFactory commandFactory, int poolSize,
753 			int maxQueuedNoReplyOperations) {
754 		// make sure dispatch message thread count is zero
755 		configuration.setDispatchMessageThreadCount(0);
756 		return new MemcachedConnector(configuration, memcachedSessionLocator,
757 				bufferAllocator, commandFactory, poolSize,
758 				maxQueuedNoReplyOperations);
759 	}
760 
761 	private final void registerMBean() {
762 		if (this.shutdown) {
763 			XMemcachedMbeanServer.getInstance().registMBean(
764 					this,
765 					this.getClass().getPackage().getName() + ":type="
766 							+ this.getClass().getSimpleName() + "-"
767 							+ MemcachedClientNameHolder.getName());
768 		}
769 	}
770 
771 	public void setOptimizeGet(boolean optimizeGet) {
772 		this.connector.setOptimizeGet(optimizeGet);
773 	}
774 
775 	/*
776 	 * (non-Javadoc)
777 	 * 
778 	 * @see
779 	 * net.rubyeye.xmemcached.MemcachedClient#setBufferAllocator(net.rubyeye
780 	 * .xmemcached.buffer.BufferAllocator)
781 	 */
782 	public final void setBufferAllocator(final BufferAllocator bufferAllocator) {
783 		this.connector.setBufferAllocator(bufferAllocator);
784 	}
785 
786 	/**
787 	 * XMemcached Constructor.
788 	 * 
789 	 * @param inetSocketAddress
790 	 * @param weight
791 	 * @throws IOException
792 	 */
793 	public XMemcachedClient(final InetSocketAddress inetSocketAddress,
794 			int weight) throws IOException {
795 		super();
796 		if (inetSocketAddress == null) {
797 			throw new IllegalArgumentException("Null InetSocketAddress");
798 
799 		}
800 		if (weight <= 0) {
801 			throw new IllegalArgumentException("weight<=0");
802 		}
803 		this.buildConnector(new ArrayMemcachedSessionLocator(),
804 				new SimpleBufferAllocator(),
805 				XMemcachedClientBuilder.getDefaultConfiguration(),
806 				XMemcachedClientBuilder.getDefaultSocketOptions(),
807 				new TextCommandFactory(), new SerializingTranscoder());
808 		this.start0();
809 		this.connect(new InetSocketAddressWrapper(inetSocketAddress,
810 				this.serverOrderCount.incrementAndGet(), weight, null));
811 	}
812 
813 	public XMemcachedClient(final InetSocketAddress inetSocketAddress)
814 			throws IOException {
815 		this(inetSocketAddress, 1);
816 	}
817 
818 	public XMemcachedClient() throws IOException {
819 		super();
820 		this.buildConnector(new ArrayMemcachedSessionLocator(),
821 				new SimpleBufferAllocator(),
822 				XMemcachedClientBuilder.getDefaultConfiguration(),
823 				XMemcachedClientBuilder.getDefaultSocketOptions(),
824 				new TextCommandFactory(), new SerializingTranscoder());
825 		this.start0();
826 	}
827 
828 	/**
829 	 * XMemcachedClient constructor.Every server's weight is one by default.
830 	 * 
831 	 * @param locator
832 	 * @param allocator
833 	 * @param conf
834 	 * @param commandFactory
835 	 * @param transcoder
836 	 * @param addressList
837 	 * @param stateListeners
838 	 * @throws IOException
839 	 */
840 	@SuppressWarnings("unchecked")
841 	XMemcachedClient(MemcachedSessionLocator locator,
842 			BufferAllocator allocator, Configuration conf,
843 			Map<SocketOption, Object> socketOptions,
844 			CommandFactory commandFactory, Transcoder transcoder,
845 			Map<InetSocketAddress, InetSocketAddress> addressMap,
846 			List<MemcachedClientStateListener> stateListeners,
847 			Map<InetSocketAddress, AuthInfo> map, int poolSize,
848 			long connectTimeout, String name, boolean failureMode)
849 			throws IOException {
850 		super();
851 		this.setConnectTimeout(connectTimeout);
852 		this.setFailureMode(failureMode);
853 		this.setName(name);
854 		this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0
855 				: addressMap.size());
856 		this.buildConnector(locator, allocator, conf, socketOptions,
857 				commandFactory, transcoder);
858 		if (stateListeners != null) {
859 			for (MemcachedClientStateListener stateListener : stateListeners) {
860 				this.addStateListener(stateListener);
861 			}
862 		}
863 		this.setAuthInfoMap(map);
864 		this.setConnectionPoolSize(poolSize);
865 		this.start0();
866 		if (addressMap != null) {
867 			for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addressMap
868 					.entrySet()) {
869 				final InetSocketAddress mainNodeAddr = entry.getKey();
870 				final InetSocketAddress standbyNodeAddr = entry.getValue();
871 				this.connect(new InetSocketAddressWrapper(mainNodeAddr,
872 						this.serverOrderCount.incrementAndGet(), 1, null));
873 				if (standbyNodeAddr != null) {
874 					this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
875 							this.serverOrderCount.incrementAndGet(), 1,
876 							mainNodeAddr));
877 				}
878 			}
879 		}
880 	}
881 
882 	/**
883 	 * XMemcachedClient constructor.
884 	 * 
885 	 * @param locator
886 	 * @param allocator
887 	 * @param conf
888 	 * @param commandFactory
889 	 * @param transcoder
890 	 * @param addressList
891 	 * @param weights
892 	 * @param stateListeners
893 	 *            weight array for address list
894 	 * @throws IOException
895 	 */
896 	@SuppressWarnings("unchecked")
897 	XMemcachedClient(MemcachedSessionLocator locator,
898 			BufferAllocator allocator, Configuration conf,
899 			Map<SocketOption, Object> socketOptions,
900 			CommandFactory commandFactory, Transcoder transcoder,
901 			Map<InetSocketAddress, InetSocketAddress> addressMap,
902 			int[] weights, List<MemcachedClientStateListener> stateListeners,
903 			Map<InetSocketAddress, AuthInfo> infoMap, int poolSize,
904 			long connectTimeout, final String name, boolean failureMode)
905 			throws IOException {
906 		super();
907 		this.setConnectTimeout(connectTimeout);
908 		this.setFailureMode(failureMode);
909 		this.setName(name);
910 		if (weights == null && addressMap != null) {
911 			throw new IllegalArgumentException("Null weights");
912 		}
913 		if (weights != null && addressMap == null) {
914 			throw new IllegalArgumentException("Null addressList");
915 		}
916 
917 		if (weights != null) {
918 			for (int weight : weights) {
919 				if (weight <= 0) {
920 					throw new IllegalArgumentException("Some weights<=0");
921 				}
922 			}
923 		}
924 		if (weights != null && addressMap != null
925 				&& weights.length < addressMap.size()) {
926 			throw new IllegalArgumentException(
927 					"weights.length is less than addressList.size()");
928 		}
929 		this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0
930 				: addressMap.size());
931 		this.buildConnector(locator, allocator, conf, socketOptions,
932 				commandFactory, transcoder);
933 		if (stateListeners != null) {
934 			for (MemcachedClientStateListener stateListener : stateListeners) {
935 				this.addStateListener(stateListener);
936 			}
937 		}
938 		this.setAuthInfoMap(infoMap);
939 		this.setConnectionPoolSize(poolSize);
940 		this.start0();
941 		if (addressMap != null && weights != null) {
942 			int i = 0;
943 			for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addressMap
944 					.entrySet()) {
945 				final InetSocketAddress mainNodeAddr = entry.getKey();
946 				final InetSocketAddress standbyNodeAddr = entry.getValue();
947 				this.connect(new InetSocketAddressWrapper(mainNodeAddr,
948 						this.serverOrderCount.incrementAndGet(), weights[i],
949 						null));
950 				if (standbyNodeAddr != null) {
951 					this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
952 							this.serverOrderCount.incrementAndGet(),
953 							weights[i], mainNodeAddr));
954 				}
955 				i++;
956 			}
957 		}
958 	}
959 
960 	private final void optimiezeSetReadThreadCount(Configuration conf,
961 			int addressCount) {
962 		if (conf != null && addressCount > 1) {
963 			if (!this.isWindowsPlatform()
964 					&& conf.getReadThreadCount() == DEFAULT_READ_THREAD_COUNT) {
965 				int threadCount = 2 * SystemUtils.getSystemThreadCount();
966 				conf.setReadThreadCount(addressCount > threadCount ? threadCount
967 						: addressCount);
968 			}
969 		}
970 	}
971 
972 	private final boolean isWindowsPlatform() {
973 		String osName = System.getProperty("os.name");
974 		if (osName != null && osName.toLowerCase().indexOf("windows") >= 0) {
975 			return true;
976 		} else {
977 			return false;
978 		}
979 	}
980 
981 	/**
982 	 * XMemcached Constructor.Every server's weight is one by default.
983 	 * 
984 	 * @param addressList
985 	 * @throws IOException
986 	 */
987 	public XMemcachedClient(List<InetSocketAddress> addressList)
988 			throws IOException {
989 		super();
990 		if (addressList == null || addressList.isEmpty()) {
991 			throw new IllegalArgumentException("Empty address list");
992 		}
993 		BufferAllocator simpleBufferAllocator = new SimpleBufferAllocator();
994 		this.buildConnector(new ArrayMemcachedSessionLocator(),
995 				simpleBufferAllocator,
996 				XMemcachedClientBuilder.getDefaultConfiguration(),
997 				XMemcachedClientBuilder.getDefaultSocketOptions(),
998 				new TextCommandFactory(), new SerializingTranscoder());
999 		this.start0();
1000 		for (InetSocketAddress inetSocketAddress : addressList) {
1001 			this.connect(new InetSocketAddressWrapper(inetSocketAddress,
1002 					this.serverOrderCount.incrementAndGet(), 1, null));
1003 
1004 		}
1005 	}
1006 
1007 	/*
1008 	 * (non-Javadoc)
1009 	 * 
1010 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String, long,
1011 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1012 	 */
1013 	@SuppressWarnings("unchecked")
1014 	public final <T> T get(final String key, final long timeout,
1015 			final Transcoder<T> transcoder) throws TimeoutException,
1016 			InterruptedException, MemcachedException {
1017 		return (T) this.get0(key, timeout, CommandType.GET_ONE, transcoder);
1018 	}
1019 
1020 	/*
1021 	 * (non-Javadoc)
1022 	 * 
1023 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String, long)
1024 	 */
1025 	@SuppressWarnings("unchecked")
1026 	public final <T> T get(final String key, final long timeout)
1027 			throws TimeoutException, InterruptedException, MemcachedException {
1028 		return (T) this.get(key, timeout, this.transcoder);
1029 	}
1030 
1031 	/*
1032 	 * (non-Javadoc)
1033 	 * 
1034 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String,
1035 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1036 	 */
1037 	public final <T> T get(final String key, final Transcoder<T> transcoder)
1038 			throws TimeoutException, InterruptedException, MemcachedException {
1039 		return this.get(key, this.opTimeout, transcoder);
1040 	}
1041 
1042 	/*
1043 	 * (non-Javadoc)
1044 	 * 
1045 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.lang.String)
1046 	 */
1047 	@SuppressWarnings("unchecked")
1048 	public final <T> T get(final String key) throws TimeoutException,
1049 			InterruptedException, MemcachedException {
1050 		return (T) this.get(key, this.opTimeout);
1051 	}
1052 
1053 	private <T> Object get0(String key, final long timeout,
1054 			final CommandType cmdType, final Transcoder<T> transcoder)
1055 			throws TimeoutException, InterruptedException, MemcachedException {
1056 		key = this.preProcessKey(key);
1057 		byte[] keyBytes = ByteUtils.getBytes(key);
1058 		ByteUtils.checkKey(keyBytes);
1059 		return this.fetch0(key, keyBytes, cmdType, timeout, transcoder);
1060 	}
1061 
1062 	/*
1063 	 * (non-Javadoc)
1064 	 * 
1065 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.lang.String, long,
1066 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1067 	 */
1068 	@SuppressWarnings("unchecked")
1069 	public final <T> GetsResponse<T> gets(final String key, final long timeout,
1070 			final Transcoder<T> transcoder) throws TimeoutException,
1071 			InterruptedException, MemcachedException {
1072 		return (GetsResponse<T>) this.get0(key, timeout, CommandType.GETS_ONE,
1073 				transcoder);
1074 	}
1075 
1076 	/*
1077 	 * (non-Javadoc)
1078 	 * 
1079 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.lang.String)
1080 	 */
1081 	public final <T> GetsResponse<T> gets(final String key)
1082 			throws TimeoutException, InterruptedException, MemcachedException {
1083 		return this.gets(key, this.opTimeout);
1084 	}
1085 
1086 	/*
1087 	 * (non-Javadoc)
1088 	 * 
1089 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.lang.String, long)
1090 	 */
1091 	@SuppressWarnings("unchecked")
1092 	public final <T> GetsResponse<T> gets(final String key, final long timeout)
1093 			throws TimeoutException, InterruptedException, MemcachedException {
1094 		return this.gets(key, timeout, this.transcoder);
1095 	}
1096 
1097 	/*
1098 	 * (non-Javadoc)
1099 	 * 
1100 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.lang.String,
1101 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1102 	 */
1103 	@SuppressWarnings("unchecked")
1104 	public final <T> GetsResponse<T> gets(final String key,
1105 			final Transcoder transcoder) throws TimeoutException,
1106 			InterruptedException, MemcachedException {
1107 		return this.gets(key, this.opTimeout, transcoder);
1108 	}
1109 
1110 	/*
1111 	 * (non-Javadoc)
1112 	 * 
1113 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.util.Collection,
1114 	 * long, net.rubyeye.xmemcached.transcoders.Transcoder)
1115 	 */
1116 	public final <T> Map<String, T> get(
1117 			final Collection<String> keyCollections, final long timeout,
1118 			final Transcoder<T> transcoder) throws TimeoutException,
1119 			InterruptedException, MemcachedException {
1120 		return this.getMulti0(keyCollections, timeout, CommandType.GET_MANY,
1121 				transcoder);
1122 	}
1123 
1124 	/*
1125 	 * (non-Javadoc)
1126 	 * 
1127 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.util.Collection,
1128 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1129 	 */
1130 	public final <T> Map<String, T> get(
1131 			final Collection<String> keyCollections,
1132 			final Transcoder<T> transcoder) throws TimeoutException,
1133 			InterruptedException, MemcachedException {
1134 		return this.getMulti0(keyCollections, this.opTimeout,
1135 				CommandType.GET_MANY, transcoder);
1136 	}
1137 
1138 	/*
1139 	 * (non-Javadoc)
1140 	 * 
1141 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.util.Collection)
1142 	 */
1143 	public final <T> Map<String, T> get(final Collection<String> keyCollections)
1144 			throws TimeoutException, InterruptedException, MemcachedException {
1145 		return this.get(keyCollections, this.opTimeout);
1146 	}
1147 
1148 	/*
1149 	 * (non-Javadoc)
1150 	 * 
1151 	 * @see net.rubyeye.xmemcached.MemcachedClient#get(java.util.Collection,
1152 	 * long)
1153 	 */
1154 	@SuppressWarnings("unchecked")
1155 	public final <T> Map<String, T> get(
1156 			final Collection<String> keyCollections, final long timeout)
1157 			throws TimeoutException, InterruptedException, MemcachedException {
1158 		return this.get(keyCollections, timeout, this.transcoder);
1159 	}
1160 
1161 	/*
1162 	 * (non-Javadoc)
1163 	 * 
1164 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.util.Collection,
1165 	 * long, net.rubyeye.xmemcached.transcoders.Transcoder)
1166 	 */
1167 	@SuppressWarnings("unchecked")
1168 	public final <T> Map<String, GetsResponse<T>> gets(
1169 			final Collection<String> keyCollections, final long timeout,
1170 			final Transcoder<T> transcoder) throws TimeoutException,
1171 			InterruptedException, MemcachedException {
1172 		return (Map<String, GetsResponse<T>>) this.getMulti0(keyCollections,
1173 				timeout, CommandType.GETS_MANY, transcoder);
1174 	}
1175 
1176 	/*
1177 	 * (non-Javadoc)
1178 	 * 
1179 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.util.Collection)
1180 	 */
1181 	public final <T> Map<String, GetsResponse<T>> gets(
1182 			final Collection<String> keyCollections) throws TimeoutException,
1183 			InterruptedException, MemcachedException {
1184 		return this.gets(keyCollections, this.opTimeout);
1185 	}
1186 
1187 	/*
1188 	 * (non-Javadoc)
1189 	 * 
1190 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.util.Collection,
1191 	 * long)
1192 	 */
1193 	@SuppressWarnings("unchecked")
1194 	public final <T> Map<String, GetsResponse<T>> gets(
1195 			final Collection<String> keyCollections, final long timeout)
1196 			throws TimeoutException, InterruptedException, MemcachedException {
1197 		return this.gets(keyCollections, timeout, this.transcoder);
1198 	}
1199 
1200 	/*
1201 	 * (non-Javadoc)
1202 	 * 
1203 	 * @see net.rubyeye.xmemcached.MemcachedClient#gets(java.util.Collection,
1204 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1205 	 */
1206 	public final <T> Map<String, GetsResponse<T>> gets(
1207 			final Collection<String> keyCollections,
1208 			final Transcoder<T> transcoder) throws TimeoutException,
1209 			InterruptedException, MemcachedException {
1210 		return this.gets(keyCollections, this.opTimeout, transcoder);
1211 	}
1212 
1213 	private final <T> Map<String, T> getMulti0(final Collection<String> keys,
1214 			final long timeout, final CommandType cmdType,
1215 			final Transcoder<T> transcoder) throws TimeoutException,
1216 			InterruptedException, MemcachedException {
1217 		if (keys == null || keys.size() == 0) {
1218 			return null;
1219 		}
1220 		Collection<String> keyCollections = new ArrayList<String>(keys.size());
1221 		for (String key : keys) {
1222 			keyCollections.add(this.preProcessKey(key));
1223 		}
1224 		final CountDownLatch latch;
1225 		final List<Command> commands;
1226 		if (this.connector.getSessionSet().size() <= 1) {
1227 			commands = new ArrayList<Command>(1);
1228 			latch = new CountDownLatch(1);
1229 			commands.add(this.sendGetMultiCommand(keyCollections, latch,
1230 					cmdType, transcoder));
1231 
1232 		} else {
1233 			Collection<List<String>> catalogKeys = this
1234 					.catalogKeys(keyCollections);
1235 			commands = new ArrayList<Command>(catalogKeys.size());
1236 			latch = new CountDownLatch(catalogKeys.size());
1237 			for (List<String> catalogKeyCollection : catalogKeys) {
1238 				commands.add(this.sendGetMultiCommand(catalogKeyCollection,
1239 						latch, cmdType, transcoder));
1240 			}
1241 		}
1242 		if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
1243 			for (Command getCmd : commands) {
1244 				getCmd.cancel();
1245 			}
1246 			throw new TimeoutException("Timed out waiting for operation");
1247 		}
1248 		return this.reduceResult(cmdType, transcoder, commands);
1249 	}
1250 
1251 	@SuppressWarnings("unchecked")
1252 	private <T> Map<String, T> reduceResult(final CommandType cmdType,
1253 			final Transcoder<T> transcoder, final List<Command> commands)
1254 			throws MemcachedException,InterruptedException,TimeoutException {
1255 		final Map<String, T> result = new HashMap<String, T>(commands.size());
1256 		for (Command getCmd : commands) {
1257 			getCmd.getIoBuffer().free();
1258 			this.checkException(getCmd);
1259 			Map<String, CachedData> map = (Map<String, CachedData>) getCmd
1260 					.getResult();
1261 			if (cmdType == CommandType.GET_MANY) {
1262 				Iterator<Map.Entry<String, CachedData>> it = map.entrySet()
1263 						.iterator();
1264 				while (it.hasNext()) {
1265 					Map.Entry<String, CachedData> entry = it.next();
1266 					String decodeKey = this.decodeKey(entry.getKey());
1267 					if (decodeKey != null) {
1268 						result.put(decodeKey,
1269 								transcoder.decode(entry.getValue()));
1270 					}
1271 				}
1272 
1273 			} else {
1274 				Iterator<Map.Entry<String, CachedData>> it = map.entrySet()
1275 						.iterator();
1276 				while (it.hasNext()) {
1277 					Map.Entry<String, CachedData> entry = it.next();
1278 					GetsResponse getsResponse = new GetsResponse(entry
1279 							.getValue().getCas(), transcoder.decode(entry
1280 							.getValue()));
1281 					String decodeKey = this.decodeKey(entry.getKey());
1282 					if (decodeKey != null) {
1283 						result.put(decodeKey, (T) getsResponse);
1284 					}
1285 				}
1286 
1287 			}
1288 
1289 		}
1290 		return result;
1291 	}
1292 
1293 	/**
1294 	 * Hash key to servers
1295 	 * 
1296 	 * @param keyCollections
1297 	 * @return
1298 	 */
1299 	private final Collection<List<String>> catalogKeys(
1300 			final Collection<String> keyCollections) {
1301 		final Map<Session, List<String>> catalogMap = new HashMap<Session, List<String>>();
1302 
1303 		for (String key : keyCollections) {
1304 			Session index = this.sessionLocator.getSessionByKey(key);
1305 			if (!catalogMap.containsKey(index)) {
1306 				List<String> tmpKeys = new ArrayList<String>(100);
1307 				tmpKeys.add(key);
1308 				catalogMap.put(index, tmpKeys);
1309 			} else {
1310 				catalogMap.get(index).add(key);
1311 			}
1312 		}
1313 
1314 		Collection<List<String>> catalogKeys = catalogMap.values();
1315 		return catalogKeys;
1316 	}
1317 
1318 	private final <T> Command sendGetMultiCommand(
1319 			final Collection<String> keys, final CountDownLatch latch,
1320 			final CommandType cmdType, final Transcoder<T> transcoder)
1321 			throws InterruptedException, TimeoutException, MemcachedException {
1322 		final Command command = this.commandFactory.createGetMultiCommand(keys,
1323 				latch, cmdType, transcoder);
1324 		this.sendCommand(command);
1325 		return command;
1326 	}
1327 
1328 	/*
1329 	 * (non-Javadoc)
1330 	 * 
1331 	 * @see net.rubyeye.xmemcached.MemcachedClient#set(java.lang.String, int, T,
1332 	 * net.rubyeye.xmemcached.transcoders.Transcoder, long)
1333 	 */
1334 	public final <T> boolean set(String key, final int exp, final T value,
1335 			final Transcoder<T> transcoder, final long timeout)
1336 			throws TimeoutException, InterruptedException, MemcachedException {
1337 		key = this.preProcessKey(key);
1338 		byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1339 		return this.sendStoreCommand(this.commandFactory.createSetCommand(key,
1340 				keyBytes, exp, value, false, transcoder), timeout);
1341 	}
1342 
1343 	@SuppressWarnings("unchecked")
1344 	public void setWithNoReply(String key, int exp, Object value)
1345 			throws InterruptedException, MemcachedException {
1346 		this.setWithNoReply(key, exp, value, this.transcoder);
1347 	}
1348 
1349 	public <T> void setWithNoReply(String key, int exp, T value,
1350 			Transcoder<T> transcoder) throws InterruptedException,
1351 			MemcachedException {
1352 		key = this.preProcessKey(key);
1353 		byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1354 		try {
1355 			this.sendStoreCommand(this.commandFactory.createSetCommand(key,
1356 					keyBytes, exp, value, true, transcoder), this.opTimeout);
1357 		} catch (TimeoutException e) {
1358 			throw new MemcachedException(e);
1359 		}
1360 	}
1361 
1362 	private final <T> byte[] checkStoreArguments(final String key,
1363 			final int exp, final T value) {
1364 		byte[] keyBytes = ByteUtils.getBytes(key);
1365 		ByteUtils.checkKey(keyBytes);
1366 		if (value == null) {
1367 			throw new IllegalArgumentException("value could not be null");
1368 		}
1369 		if (exp < 0) {
1370 			throw new IllegalArgumentException(
1371 					"Expire time must be greater than or equal to 0");
1372 		}
1373 		return keyBytes;
1374 	}
1375 
1376 	/*
1377 	 * (non-Javadoc)
1378 	 * 
1379 	 * @see net.rubyeye.xmemcached.MemcachedClient#set(java.lang.String, int,
1380 	 * java.lang.Object)
1381 	 */
1382 	public final boolean set(final String key, final int exp, final Object value)
1383 			throws TimeoutException, InterruptedException, MemcachedException {
1384 		return this.set(key, exp, value, this.opTimeout);
1385 	}
1386 
1387 	/*
1388 	 * (non-Javadoc)
1389 	 * 
1390 	 * @see net.rubyeye.xmemcached.MemcachedClient#set(java.lang.String, int,
1391 	 * java.lang.Object, long)
1392 	 */
1393 	@SuppressWarnings("unchecked")
1394 	public final boolean set(final String key, final int exp,
1395 			final Object value, final long timeout) throws TimeoutException,
1396 			InterruptedException, MemcachedException {
1397 		return this.set(key, exp, value, this.transcoder, timeout);
1398 	}
1399 
1400 	/*
1401 	 * (non-Javadoc)
1402 	 * 
1403 	 * @see net.rubyeye.xmemcached.MemcachedClient#set(java.lang.String, int, T,
1404 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1405 	 */
1406 	public final <T> boolean set(final String key, final int exp,
1407 			final T value, final Transcoder<T> transcoder)
1408 			throws TimeoutException, InterruptedException, MemcachedException {
1409 		return this.set(key, exp, value, transcoder, this.opTimeout);
1410 	}
1411 
1412 	/*
1413 	 * (non-Javadoc)
1414 	 * 
1415 	 * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int, T,
1416 	 * net.rubyeye.xmemcached.transcoders.Transcoder, long)
1417 	 */
1418 	public final <T> boolean add(String key, final int exp, final T value,
1419 			final Transcoder<T> transcoder, final long timeout)
1420 			throws TimeoutException, InterruptedException, MemcachedException {
1421 		key = this.preProcessKey(key);
1422 		return this.add0(key, exp, value, transcoder, timeout);
1423 	}
1424 
1425 	private <T> boolean add0(String key, int exp, T value,
1426 			Transcoder<T> transcoder, long timeout)
1427 			throws InterruptedException, TimeoutException, MemcachedException {
1428 		byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1429 		return this.sendStoreCommand(this.commandFactory.createAddCommand(key,
1430 				keyBytes, exp, value, false, transcoder), timeout);
1431 	}
1432 
1433 	/*
1434 	 * (non-Javadoc)
1435 	 * 
1436 	 * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int,
1437 	 * java.lang.Object)
1438 	 */
1439 	public final boolean add(final String key, final int exp, final Object value)
1440 			throws TimeoutException, InterruptedException, MemcachedException {
1441 		return this.add(key, exp, value, this.opTimeout);
1442 	}
1443 
1444 	/*
1445 	 * (non-Javadoc)
1446 	 * 
1447 	 * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int,
1448 	 * java.lang.Object, long)
1449 	 */
1450 	@SuppressWarnings("unchecked")
1451 	public final boolean add(final String key, final int exp,
1452 			final Object value, final long timeout) throws TimeoutException,
1453 			InterruptedException, MemcachedException {
1454 		return this.add(key, exp, value, this.transcoder, timeout);
1455 	}
1456 
1457 	/*
1458 	 * (non-Javadoc)
1459 	 * 
1460 	 * @see net.rubyeye.xmemcached.MemcachedClient#add(java.lang.String, int, T,
1461 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1462 	 */
1463 	public final <T> boolean add(final String key, final int exp,
1464 			final T value, final Transcoder<T> transcoder)
1465 			throws TimeoutException, InterruptedException, MemcachedException {
1466 		return this.add(key, exp, value, transcoder, this.opTimeout);
1467 	}
1468 
1469 	@SuppressWarnings("unchecked")
1470 	public void addWithNoReply(String key, int exp, Object value)
1471 			throws InterruptedException, MemcachedException {
1472 		this.addWithNoReply(key, exp, value, this.transcoder);
1473 
1474 	}
1475 
1476 	public <T> void addWithNoReply(String key, int exp, T value,
1477 			Transcoder<T> transcoder) throws InterruptedException,
1478 			MemcachedException {
1479 		key = this.preProcessKey(key);
1480 		byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1481 		try {
1482 			this.sendStoreCommand(this.commandFactory.createAddCommand(key,
1483 					keyBytes, exp, value, true, transcoder), this.opTimeout);
1484 		} catch (TimeoutException e) {
1485 			throw new MemcachedException(e);
1486 		}
1487 
1488 	}
1489 
1490 	@SuppressWarnings("unchecked")
1491 	public void replaceWithNoReply(String key, int exp, Object value)
1492 			throws InterruptedException, MemcachedException {
1493 		this.replaceWithNoReply(key, exp, value, this.transcoder);
1494 
1495 	}
1496 
1497 	public <T> void replaceWithNoReply(String key, int exp, T value,
1498 			Transcoder<T> transcoder) throws InterruptedException,
1499 			MemcachedException {
1500 		key = this.preProcessKey(key);
1501 		byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1502 		try {
1503 			this.sendStoreCommand(this.commandFactory.createReplaceCommand(key,
1504 					keyBytes, exp, value, true, transcoder), this.opTimeout);
1505 		} catch (TimeoutException e) {
1506 			throw new MemcachedException(e);
1507 		}
1508 
1509 	}
1510 
1511 	/*
1512 	 * (non-Javadoc)
1513 	 * 
1514 	 * @see net.rubyeye.xmemcached.MemcachedClient#replace(java.lang.String,
1515 	 * int, T, net.rubyeye.xmemcached.transcoders.Transcoder, long)
1516 	 */
1517 	public final <T> boolean replace(String key, final int exp, final T value,
1518 			final Transcoder<T> transcoder, final long timeout)
1519 			throws TimeoutException, InterruptedException, MemcachedException {
1520 		key = this.preProcessKey(key);
1521 		byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1522 		return this.sendStoreCommand(this.commandFactory.createReplaceCommand(
1523 				key, keyBytes, exp, value, false, transcoder), timeout);
1524 	}
1525 
1526 	/*
1527 	 * (non-Javadoc)
1528 	 * 
1529 	 * @see net.rubyeye.xmemcached.MemcachedClient#replace(java.lang.String,
1530 	 * int, java.lang.Object)
1531 	 */
1532 	public final boolean replace(final String key, final int exp,
1533 			final Object value) throws TimeoutException, InterruptedException,
1534 			MemcachedException {
1535 		return this.replace(key, exp, value, this.opTimeout);
1536 	}
1537 
1538 	/*
1539 	 * (non-Javadoc)
1540 	 * 
1541 	 * @see net.rubyeye.xmemcached.MemcachedClient#replace(java.lang.String,
1542 	 * int, java.lang.Object, long)
1543 	 */
1544 	@SuppressWarnings("unchecked")
1545 	public final boolean replace(final String key, final int exp,
1546 			final Object value, final long timeout) throws TimeoutException,
1547 			InterruptedException, MemcachedException {
1548 		return this.replace(key, exp, value, this.transcoder, timeout);
1549 	}
1550 
1551 	/*
1552 	 * (non-Javadoc)
1553 	 * 
1554 	 * @see net.rubyeye.xmemcached.MemcachedClient#replace(java.lang.String,
1555 	 * int, T, net.rubyeye.xmemcached.transcoders.Transcoder)
1556 	 */
1557 	public final <T> boolean replace(final String key, final int exp,
1558 			final T value, final Transcoder<T> transcoder)
1559 			throws TimeoutException, InterruptedException, MemcachedException {
1560 		return this.replace(key, exp, value, transcoder, this.opTimeout);
1561 	}
1562 
1563 	/*
1564 	 * (non-Javadoc)
1565 	 * 
1566 	 * @see net.rubyeye.xmemcached.MemcachedClient#append(java.lang.String,
1567 	 * java.lang.Object)
1568 	 */
1569 	public final boolean append(final String key, final Object value)
1570 			throws TimeoutException, InterruptedException, MemcachedException {
1571 		return this.append(key, value, this.opTimeout);
1572 	}
1573 
1574 	/*
1575 	 * (non-Javadoc)
1576 	 * 
1577 	 * @see net.rubyeye.xmemcached.MemcachedClient#append(java.lang.String,
1578 	 * java.lang.Object, long)
1579 	 */
1580 	public final boolean append(String key, final Object value,
1581 			final long timeout) throws TimeoutException, InterruptedException,
1582 			MemcachedException {
1583 		key = this.preProcessKey(key);
1584 		byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1585 		return this.sendStoreCommand(this.commandFactory.createAppendCommand(
1586 				key, keyBytes, value, false, this.transcoder), timeout);
1587 	}
1588 
1589 	public void appendWithNoReply(String key, Object value)
1590 			throws InterruptedException, MemcachedException {
1591 		key = this.preProcessKey(key);
1592 		byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1593 		try {
1594 			this.sendStoreCommand(this.commandFactory.createAppendCommand(key,
1595 					keyBytes, value, true, this.transcoder), this.opTimeout);
1596 		} catch (TimeoutException e) {
1597 			throw new MemcachedException(e);
1598 		}
1599 
1600 	}
1601 
1602 	/*
1603 	 * (non-Javadoc)
1604 	 * 
1605 	 * @see net.rubyeye.xmemcached.MemcachedClient#prepend(java.lang.String,
1606 	 * java.lang.Object)
1607 	 */
1608 	public final boolean prepend(final String key, final Object value)
1609 			throws TimeoutException, InterruptedException, MemcachedException {
1610 		return this.prepend(key, value, this.opTimeout);
1611 	}
1612 
1613 	/*
1614 	 * (non-Javadoc)
1615 	 * 
1616 	 * @see net.rubyeye.xmemcached.MemcachedClient#prepend(java.lang.String,
1617 	 * java.lang.Object, long)
1618 	 */
1619 	public final boolean prepend(String key, final Object value,
1620 			final long timeout) throws TimeoutException, InterruptedException,
1621 			MemcachedException {
1622 		key = this.preProcessKey(key);
1623 		byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1624 		return this.sendStoreCommand(this.commandFactory.createPrependCommand(
1625 				key, keyBytes, value, false, this.transcoder), timeout);
1626 	}
1627 
1628 	public void prependWithNoReply(String key, Object value)
1629 			throws InterruptedException, MemcachedException {
1630 		key = this.preProcessKey(key);
1631 		byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1632 		try {
1633 			this.sendStoreCommand(this.commandFactory.createPrependCommand(key,
1634 					keyBytes, value, true, this.transcoder), this.opTimeout);
1635 		} catch (TimeoutException e) {
1636 			throw new MemcachedException(e);
1637 		}
1638 	}
1639 
1640 	/*
1641 	 * (non-Javadoc)
1642 	 * 
1643 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
1644 	 * java.lang.Object, long)
1645 	 */
1646 	public final boolean cas(final String key, final int exp,
1647 			final Object value, final long cas) throws TimeoutException,
1648 			InterruptedException, MemcachedException {
1649 		return this.cas(key, exp, value, this.opTimeout, cas);
1650 	}
1651 
1652 	/*
1653 	 * (non-Javadoc)
1654 	 * 
1655 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, T,
1656 	 * net.rubyeye.xmemcached.transcoders.Transcoder, long, long)
1657 	 */
1658 	public final <T> boolean cas(String key, final int exp, final T value,
1659 			final Transcoder<T> transcoder, final long timeout, final long cas)
1660 			throws TimeoutException, InterruptedException, MemcachedException {
1661 		key = this.preProcessKey(key);
1662 		byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1663 		return this.sendStoreCommand(this.commandFactory.createCASCommand(key,
1664 				keyBytes, exp, value, cas, false, transcoder), timeout);
1665 	}
1666 
1667 	/*
1668 	 * (non-Javadoc)
1669 	 * 
1670 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
1671 	 * java.lang.Object, long, long)
1672 	 */
1673 	@SuppressWarnings("unchecked")
1674 	public final boolean cas(final String key, final int exp,
1675 			final Object value, final long timeout, final long cas)
1676 			throws TimeoutException, InterruptedException, MemcachedException {
1677 		return this.cas(key, exp, value, this.transcoder, timeout, cas);
1678 	}
1679 
1680 	/*
1681 	 * (non-Javadoc)
1682 	 * 
1683 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int, T,
1684 	 * net.rubyeye.xmemcached.transcoders.Transcoder, long)
1685 	 */
1686 	public final <T> boolean cas(final String key, final int exp,
1687 			final T value, final Transcoder<T> transcoder, final long cas)
1688 			throws TimeoutException, InterruptedException, MemcachedException {
1689 		return this.cas(key, exp, value, transcoder, this.opTimeout, cas);
1690 	}
1691 
1692 	private final <T> boolean cas0(final String key, final int exp,
1693 			GetsResponse<T> getsResponse, final CASOperation<T> operation,
1694 			final Transcoder<T> transcoder, byte[] keyBytes, boolean noreply)
1695 			throws TimeoutException, InterruptedException, MemcachedException {
1696 		if (operation == null) {
1697 			throw new IllegalArgumentException("CASOperation could not be null");
1698 		}
1699 		if (operation.getMaxTries() < 0) {
1700 			throw new IllegalArgumentException(
1701 					"max tries must be greater than 0");
1702 		}
1703 		int tryCount = 0;
1704 		GetsResponse<T> result = getsResponse;
1705 		if (result == null) {
1706 			throw new NoValueException("Null GetsResponse for key=" + key);
1707 		}
1708 		while (tryCount <= operation.getMaxTries()
1709 				&& result != null
1710 				&& !this.sendStoreCommand(this.commandFactory.createCASCommand(
1711 						key,
1712 						keyBytes,
1713 						exp,
1714 						operation.getNewValue(result.getCas(),
1715 								result.getValue()), result.getCas(), noreply,
1716 						transcoder), this.opTimeout) && !noreply) {
1717 			tryCount++;
1718 			result = this.gets0(key, keyBytes, transcoder);
1719 			if (result == null) {
1720 				throw new NoValueException(
1721 						"could not gets the value for Key=" + key + " for cas");
1722 			}
1723 			if (tryCount > operation.getMaxTries()) {
1724 				throw new TimeoutException("CAS try times is greater than max");
1725 			}
1726 		}
1727 		return true;
1728 	}
1729 
1730 	/*
1731 	 * (non-Javadoc)
1732 	 * 
1733 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
1734 	 * net.rubyeye.xmemcached.CASOperation,
1735 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1736 	 */
1737 	public final <T> boolean cas(String key, final int exp,
1738 			final CASOperation<T> operation, final Transcoder<T> transcoder)
1739 			throws TimeoutException, InterruptedException, MemcachedException {
1740 		key = this.preProcessKey(key);
1741 		byte[] keyBytes = ByteUtils.getBytes(key);
1742 		ByteUtils.checkKey(keyBytes);
1743 		GetsResponse<T> result = this.gets0(key, keyBytes, transcoder);
1744 		return this.cas0(key, exp, result, operation, transcoder, keyBytes,
1745 				false);
1746 	}
1747 
1748 	/*
1749 	 * (non-Javadoc)
1750 	 * 
1751 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
1752 	 * net.rubyeye.xmemcached.GetsResponse, net.rubyeye.xmemcached.CASOperation,
1753 	 * net.rubyeye.xmemcached.transcoders.Transcoder)
1754 	 */
1755 	public final <T> boolean cas(String key, final int exp,
1756 			GetsResponse<T> getsReponse, final CASOperation<T> operation,
1757 			final Transcoder<T> transcoder) throws TimeoutException,
1758 			InterruptedException, MemcachedException {
1759 		key = this.preProcessKey(key);
1760 		byte[] keyBytes = ByteUtils.getBytes(key);
1761 		ByteUtils.checkKey(keyBytes);
1762 		return this.cas0(key, exp, getsReponse, operation, transcoder,
1763 				keyBytes, false);
1764 	}
1765 
1766 	/*
1767 	 * (non-Javadoc)
1768 	 * 
1769 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
1770 	 * net.rubyeye.xmemcached.GetsResponse, net.rubyeye.xmemcached.CASOperation)
1771 	 */
1772 	@SuppressWarnings("unchecked")
1773 	public final <T> boolean cas(final String key, final int exp,
1774 			GetsResponse<T> getsReponse, final CASOperation<T> operation)
1775 			throws TimeoutException, InterruptedException, MemcachedException {
1776 
1777 		return this.cas(key, exp, getsReponse, operation, this.transcoder);
1778 	}
1779 
1780 	public <T> void casWithNoReply(String key, CASOperation<T> operation)
1781 			throws TimeoutException, InterruptedException, MemcachedException {
1782 		this.casWithNoReply(key, 0, operation);
1783 	}
1784 
1785 	public <T> void casWithNoReply(String key, GetsResponse<T> getsResponse,
1786 			CASOperation<T> operation) throws TimeoutException,
1787 			InterruptedException, MemcachedException {
1788 		this.casWithNoReply(key, 0, getsResponse, operation);
1789 
1790 	}
1791 
1792 	@SuppressWarnings("unchecked")
1793 	public <T> void casWithNoReply(String key, int exp,
1794 			CASOperation<T> operation) throws TimeoutException,
1795 			InterruptedException, MemcachedException {
1796 		key = this.preProcessKey(key);
1797 		byte[] keyBytes = ByteUtils.getBytes(key);
1798 		GetsResponse<T> result = this.gets0(key, keyBytes, this.transcoder);
1799 		this.casWithNoReply(key, exp, result, operation);
1800 
1801 	}
1802 
1803 	@SuppressWarnings("unchecked")
1804 	public <T> void casWithNoReply(String key, int exp,
1805 			GetsResponse<T> getsReponse, CASOperation<T> operation)
1806 			throws TimeoutException, InterruptedException, MemcachedException {
1807 		key = this.preProcessKey(key);
1808 		byte[] keyBytes = ByteUtils.getBytes(key);
1809 		ByteUtils.checkKey(keyBytes);
1810 		this.cas0(key, exp, getsReponse, operation, this.transcoder, keyBytes,
1811 				true);
1812 
1813 	}
1814 
1815 	/*
1816 	 * (non-Javadoc)
1817 	 * 
1818 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String,
1819 	 * net.rubyeye.xmemcached.GetsResponse, net.rubyeye.xmemcached.CASOperation)
1820 	 */
1821 	public final <T> boolean cas(final String key, GetsResponse<T> getsReponse,
1822 			final CASOperation<T> operation) throws TimeoutException,
1823 			InterruptedException, MemcachedException {
1824 		return this.cas(key, 0, getsReponse, operation);
1825 	}
1826 
1827 	/*
1828 	 * (non-Javadoc)
1829 	 * 
1830 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String, int,
1831 	 * net.rubyeye.xmemcached.CASOperation)
1832 	 */
1833 	@SuppressWarnings("unchecked")
1834 	public final <T> boolean cas(final String key, final int exp,
1835 			final CASOperation<T> operation) throws TimeoutException,
1836 			InterruptedException, MemcachedException {
1837 		return this.cas(key, exp, operation, this.transcoder);
1838 	}
1839 
1840 	/*
1841 	 * (non-Javadoc)
1842 	 * 
1843 	 * @see net.rubyeye.xmemcached.MemcachedClient#cas(java.lang.String,
1844 	 * net.rubyeye.xmemcached.CASOperation)
1845 	 */
1846 	public final <T> boolean cas(final String key,
1847 			final CASOperation<T> operation) throws TimeoutException,
1848 			InterruptedException, MemcachedException {
1849 		return this.cas(key, 0, operation);
1850 	}
1851 
1852 	/*
1853 	 * (non-Javadoc)
1854 	 * 
1855 	 * @see net.rubyeye.xmemcached.MemcachedClient#delete(java.lang.String, int)
1856 	 */
1857 	public final boolean delete(final String key, final int time)
1858 			throws TimeoutException, InterruptedException, MemcachedException {
1859 		return this.delete0(key, time, 0, false, this.opTimeout);
1860 	}
1861 
1862 	public boolean delete(String key, long opTimeout) throws TimeoutException,
1863 			InterruptedException, MemcachedException {
1864 		return this.delete0(key, 0, 0, false, opTimeout);
1865 	}
1866 
1867 	public boolean delete(String key, long cas, long opTimeout)
1868 			throws TimeoutException, InterruptedException, MemcachedException {
1869 		return this.delete0(key, 0, cas, false, opTimeout);
1870 	}
1871 
1872 	/**
1873 	 * Delete key's data item from memcached.This method doesn't wait for reply
1874 	 * 
1875 	 * @param key
1876 	 * @param time
1877 	 * @throws InterruptedException
1878 	 * @throws MemcachedException
1879 	 */
1880 	public final void deleteWithNoReply(final String key, final int time)
1881 			throws InterruptedException, MemcachedException {
1882 		try {
1883 			this.delete0(key, time, 0, true, this.opTimeout);
1884 		} catch (TimeoutException e) {
1885 			throw new MemcachedException(e);
1886 		}
1887 	}
1888 
1889 	public final void deleteWithNoReply(final String key)
1890 			throws InterruptedException, MemcachedException {
1891 		this.deleteWithNoReply(key, 0);
1892 	}
1893 
1894 	private boolean delete0(String key, final int time, long cas,
1895 			boolean noreply, long opTimeout) throws MemcachedException,
1896 			InterruptedException, TimeoutException {
1897 		key = this.preProcessKey(key);
1898 		final byte[] keyBytes = ByteUtils.getBytes(key);
1899 		ByteUtils.checkKey(keyBytes);
1900 		final Command command = this.commandFactory.createDeleteCommand(key,
1901 				keyBytes, time, cas, noreply);
1902 		final Session session = this.sendCommand(command);
1903 		if (!command.isNoreply()) {
1904 			this.latchWait(command, opTimeout, session);
1905 			command.getIoBuffer().free();
1906 			this.checkException(command);
1907 			if (command.getResult() == null) {
1908 				throw new MemcachedException(
1909 						"Operation fail,may be caused by networking or timeout");
1910 			}
1911 		} else {
1912 			return false;
1913 		}
1914 		return (Boolean) command.getResult();
1915 	}
1916 
1917 	void checkException(final Command command) throws MemcachedException {
1918 		if (command.getException() != null) {
1919 			if (command.getException() instanceof MemcachedException) {
1920 				throw (MemcachedException) command.getException();
1921 			} else {
1922 				throw new MemcachedException(command.getException());
1923 			}
1924 		}
1925 	}
1926 
1927 	public boolean touch(String key, int exp, long opTimeout)
1928 			throws TimeoutException, InterruptedException, MemcachedException {
1929 		key = this.preProcessKey(key);
1930 		final byte[] keyBytes = ByteUtils.getBytes(key);
1931 		ByteUtils.checkKey(keyBytes);
1932 		CountDownLatch latch = new CountDownLatch(1);
1933 		final Command command = this.commandFactory.createTouchCommand(key,
1934 				keyBytes, latch, exp, false);
1935 		this.latchWait(command, opTimeout, this.sendCommand(command));
1936 		command.getIoBuffer().free();
1937 		this.checkException(command);
1938 		if (command.getResult() == null) {
1939 			throw new MemcachedException(
1940 					"Operation fail,may be caused by networking or timeout");
1941 		}
1942 		return (Boolean) command.getResult();
1943 	}
1944 
1945 	public boolean touch(String key, int exp) throws TimeoutException,
1946 			InterruptedException, MemcachedException {
1947 		return this.touch(key, exp, this.opTimeout);
1948 	}
1949 
1950 	@SuppressWarnings("unchecked")
1951 	public <T> T getAndTouch(String key, int newExp, long opTimeout)
1952 			throws TimeoutException, InterruptedException, MemcachedException {
1953 		key = this.preProcessKey(key);
1954 		final byte[] keyBytes = ByteUtils.getBytes(key);
1955 		ByteUtils.checkKey(keyBytes);
1956 		CountDownLatch latch = new CountDownLatch(1);
1957 		final Command command = this.commandFactory.createGetAndTouchCommand(
1958 				key, keyBytes, latch, newExp, false);
1959 		this.latchWait(command, opTimeout, this.sendCommand(command));
1960 		command.getIoBuffer().free();
1961 		this.checkException(command);
1962 		CachedData data = (CachedData) command.getResult();
1963 		if (data == null) {
1964 			return null;
1965 		}
1966 		return (T) this.transcoder.decode(data);
1967 	}
1968 
1969 	@SuppressWarnings("unchecked")
1970 	public <T> T getAndTouch(String key, int newExp) throws TimeoutException,
1971 			InterruptedException, MemcachedException {
1972 		return (T) this.getAndTouch(key, newExp, this.opTimeout);
1973 	}
1974 
1975 	/*
1976 	 * (non-Javadoc)
1977 	 * 
1978 	 * @see net.rubyeye.xmemcached.MemcachedClient#incr(java.lang.String, int)
1979 	 */
1980 	public final long incr(String key, final long delta)
1981 			throws TimeoutException, InterruptedException, MemcachedException {
1982 		key = this.preProcessKey(key);
1983 		return this.sendIncrOrDecrCommand(key, delta, 0, CommandType.INCR,
1984 				false, this.opTimeout, 0);
1985 	}
1986 
1987 	public long incr(String key, long delta, long initValue)
1988 			throws TimeoutException, InterruptedException, MemcachedException {
1989 		key = this.preProcessKey(key);
1990 		return this.sendIncrOrDecrCommand(key, delta, initValue,
1991 				CommandType.INCR, false, this.opTimeout, 0);
1992 	}
1993 
1994 	public long incr(String key, long delta, long initValue, long timeout)
1995 			throws TimeoutException, InterruptedException, MemcachedException {
1996 		key = this.preProcessKey(key);
1997 		return this.sendIncrOrDecrCommand(key, delta, initValue,
1998 				CommandType.INCR, false, timeout, 0);
1999 	}
2000 
2001 	public long incr(String key, long delta, long initValue, long timeout,
2002 			int exp) throws TimeoutException, InterruptedException,
2003 			MemcachedException {
2004 		key = this.preProcessKey(key);
2005 		return this.sendIncrOrDecrCommand(key, delta, initValue,
2006 				CommandType.INCR, false, timeout, exp);
2007 	}
2008 
2009 	public final void incrWithNoReply(String key, long delta)
2010 			throws InterruptedException, MemcachedException {
2011 		key = this.preProcessKey(key);
2012 		try {
2013 			this.sendIncrOrDecrCommand(key, delta, 0, CommandType.INCR, true,
2014 					this.opTimeout, 0);
2015 		} catch (TimeoutException e) {
2016 			throw new MemcachedException(e);
2017 		}
2018 	}
2019 
2020 	public final void decrWithNoReply(String key, final long delta)
2021 			throws InterruptedException, MemcachedException {
2022 		key = this.preProcessKey(key);
2023 		try {
2024 			this.sendIncrOrDecrCommand(key, delta, 0, CommandType.DECR, true,
2025 					this.opTimeout, 0);
2026 		} catch (TimeoutException e) {
2027 			throw new MemcachedException(e);
2028 		}
2029 	}
2030 
2031 	/*
2032 	 * (non-Javadoc)
2033 	 * 
2034 	 * @see net.rubyeye.xmemcached.MemcachedClient#decr(java.lang.String, int)
2035 	 */
2036 	public final long decr(String key, final long delta)
2037 			throws TimeoutException, InterruptedException, MemcachedException {
2038 		key = this.preProcessKey(key);
2039 		return this.sendIncrOrDecrCommand(key, delta, 0, CommandType.DECR,
2040 				false, this.opTimeout, 0);
2041 	}
2042 
2043 	public long decr(String key, long delta, long initValue)
2044 			throws TimeoutException, InterruptedException, MemcachedException {
2045 		key = this.preProcessKey(key);
2046 		return this.sendIncrOrDecrCommand(key, delta, initValue,
2047 				CommandType.DECR, false, this.opTimeout, 0);
2048 	}
2049 
2050 	public long decr(String key, long delta, long initValue, long timeout)
2051 			throws TimeoutException, InterruptedException, MemcachedException {
2052 		key = this.preProcessKey(key);
2053 		return this.sendIncrOrDecrCommand(key, delta, initValue,
2054 				CommandType.DECR, false, timeout, 0);
2055 	}
2056 
2057 	public long decr(String key, long delta, long initValue, long timeout,
2058 			int exp) throws TimeoutException, InterruptedException,
2059 			MemcachedException {
2060 		key = this.preProcessKey(key);
2061 		return this.sendIncrOrDecrCommand(key, delta, initValue,
2062 				CommandType.DECR, false, timeout, exp);
2063 	}
2064 
2065 	/*
2066 	 * (non-Javadoc)
2067 	 * 
2068 	 * @see net.rubyeye.xmemcached.MemcachedClient#flushAll()
2069 	 */
2070 	public final void flushAll() throws TimeoutException, InterruptedException,
2071 			MemcachedException {
2072 		this.flushAll(this.opTimeout);
2073 	}
2074 
2075 	public void flushAllWithNoReply() throws InterruptedException,
2076 			MemcachedException {
2077 		try {
2078 			this.flushAllMemcachedServers(this.opTimeout, true, 0);
2079 		} catch (TimeoutException e) {
2080 			throw new MemcachedException(e);
2081 		}
2082 	}
2083 
2084 	public void flushAllWithNoReply(int exptime) throws InterruptedException,
2085 			MemcachedException {
2086 		try {
2087 			this.flushAllMemcachedServers(this.opTimeout, true, exptime);
2088 		} catch (TimeoutException e) {
2089 			throw new MemcachedException(e);
2090 		}
2091 	}
2092 
2093 	public void flushAllWithNoReply(InetSocketAddress address)
2094 			throws MemcachedException, InterruptedException {
2095 		try {
2096 			this.flushSpecialMemcachedServer(address, this.opTimeout, true, 0);
2097 		} catch (TimeoutException e) {
2098 			throw new MemcachedException(e);
2099 		}
2100 	}
2101 
2102 	public void flushAllWithNoReply(InetSocketAddress address, int exptime)
2103 			throws MemcachedException, InterruptedException {
2104 		try {
2105 			this.flushSpecialMemcachedServer(address, this.opTimeout, true,
2106 					exptime);
2107 		} catch (TimeoutException e) {
2108 			throw new MemcachedException(e);
2109 		}
2110 	}
2111 
2112 	public final void flushAll(int exptime, long timeout)
2113 			throws TimeoutException, InterruptedException, MemcachedException {
2114 		this.flushAllMemcachedServers(timeout, false, exptime);
2115 	}
2116 
2117 	/*
2118 	 * (non-Javadoc)
2119 	 * 
2120 	 * @see net.rubyeye.xmemcached.MemcachedClient#flushAll(long)
2121 	 */
2122 	public final void flushAll(long timeout) throws TimeoutException,
2123 			InterruptedException, MemcachedException {
2124 		this.flushAllMemcachedServers(timeout, false, 0);
2125 	}
2126 
2127 	private void flushAllMemcachedServers(long timeout, boolean noreply,
2128 			int exptime) throws MemcachedException, InterruptedException,
2129 			TimeoutException {
2130 		final Collection<Session> sessions = this.connector.getSessionSet();
2131 		CountDownLatch latch = new CountDownLatch(sessions.size());
2132 		List<Command> commands = new ArrayList<Command>(sessions.size());
2133 		for (Session session : sessions) {
2134 			if (session != null && !session.isClosed()) {
2135 				Command command = this.commandFactory.createFlushAllCommand(
2136 						latch, exptime, noreply);
2137 
2138 				session.write(command);
2139 			} else {
2140 				latch.countDown();
2141 			}
2142 		}
2143 		if (!noreply) {
2144 			if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
2145 				for (Command cmd : commands) {
2146 					cmd.cancel();
2147 				}
2148 				throw new TimeoutException("Timed out waiting for operation");
2149 			}
2150 		}
2151 	}
2152 
2153 	public void setLoggingLevelVerbosity(InetSocketAddress address, int level)
2154 			throws TimeoutException, InterruptedException, MemcachedException {
2155 		this.setMemcachedLoggingLevel(address, level, false);
2156 
2157 	}
2158 
2159 	private void setMemcachedLoggingLevel(InetSocketAddress address, int level,
2160 			boolean noreply) throws MemcachedException, InterruptedException,
2161 			TimeoutException {
2162 		if (address == null) {
2163 			throw new IllegalArgumentException("Null adderss");
2164 		}
2165 		CountDownLatch latch = new CountDownLatch(1);
2166 
2167 		Queue<Session> sessionQueue = this.connector
2168 				.getSessionByAddress(address);
2169 		if (sessionQueue == null || sessionQueue.peek() == null) {
2170 			throw new MemcachedException("could not find session for "
2171 					+ SystemUtils.getRawAddress(address) + ":"
2172 					+ address.getPort() + ",maybe it have not been connected");
2173 		}
2174 
2175 		Command command = this.commandFactory.createVerbosityCommand(latch,
2176 				level, noreply);
2177 		final Session session = sessionQueue.peek();
2178 		session.write(command);
2179 		if (!noreply) {
2180 			this.latchWait(command, this.opTimeout, session);
2181 		}
2182 	}
2183 
2184 	public void setLoggingLevelVerbosityWithNoReply(InetSocketAddress address,
2185 			int level) throws InterruptedException, MemcachedException {
2186 		try {
2187 			this.setMemcachedLoggingLevel(address, level, true);
2188 		} catch (TimeoutException e) {
2189 			throw new MemcachedException(e);
2190 		}
2191 
2192 	}
2193 
2194 	/*
2195 	 * (non-Javadoc)
2196 	 * 
2197 	 * @see
2198 	 * net.rubyeye.xmemcached.MemcachedClient#flushAll(java.net.InetSocketAddress
2199 	 * )
2200 	 */
2201 	public final void flushAll(InetSocketAddress address)
2202 			throws MemcachedException, InterruptedException, TimeoutException {
2203 		this.flushAll(address, this.opTimeout);
2204 	}
2205 
2206 	/*
2207 	 * (non-Javadoc)
2208 	 * 
2209 	 * @see
2210 	 * net.rubyeye.xmemcached.MemcachedClient#flushAll(java.net.InetSocketAddress
2211 	 * , long)
2212 	 */
2213 	public final void flushAll(InetSocketAddress address, long timeout)
2214 			throws MemcachedException, InterruptedException, TimeoutException {
2215 		this.flushSpecialMemcachedServer(address, timeout, false, 0);
2216 	}
2217 
2218 	public final void flushAll(InetSocketAddress address, long timeout,
2219 			int exptime) throws MemcachedException, InterruptedException,
2220 			TimeoutException {
2221 		this.flushSpecialMemcachedServer(address, timeout, false, exptime);
2222 	}
2223 
2224 	private void flushSpecialMemcachedServer(InetSocketAddress address,
2225 			long timeout, boolean noreply, int exptime)
2226 			throws MemcachedException, InterruptedException, TimeoutException {
2227 		if (address == null) {
2228 			throw new IllegalArgumentException("Null adderss");
2229 		}
2230 		CountDownLatch latch = new CountDownLatch(1);
2231 
2232 		Queue<Session> sessionQueue = this.connector
2233 				.getSessionByAddress(address);
2234 		if (sessionQueue == null || sessionQueue.peek() == null) {
2235 			throw new MemcachedException("could not find session for "
2236 					+ SystemUtils.getRawAddress(address) + ":"
2237 					+ address.getPort() + ",maybe it have not been connected");
2238 		}
2239 		Command command = this.commandFactory.createFlushAllCommand(latch,
2240 				exptime, noreply);
2241 		final Session session = sessionQueue.peek();
2242 		session.write(command);
2243 		if (!noreply) {
2244 			this.latchWait(command, timeout, session);
2245 		}
2246 	}
2247 
2248 	/*
2249 	 * (non-Javadoc)
2250 	 * 
2251 	 * @see net.rubyeye.xmemcached.MemcachedClient#flushAll(java.lang.String)
2252 	 */
2253 	public final void flushAll(String host) throws TimeoutException,
2254 			InterruptedException, MemcachedException {
2255 		this.flushAll(AddrUtil.getOneAddress(host), this.opTimeout);
2256 	}
2257 
2258 	/*
2259 	 * (non-Javadoc)
2260 	 * 
2261 	 * @see
2262 	 * net.rubyeye.xmemcached.MemcachedClient#stats(java.net.InetSocketAddress)
2263 	 */
2264 	public final Map<String, String> stats(InetSocketAddress address)
2265 			throws MemcachedException, InterruptedException, TimeoutException {
2266 		return this.stats(address, this.opTimeout);
2267 	}
2268 
2269 	/*
2270 	 * (non-Javadoc)
2271 	 * 
2272 	 * @see
2273 	 * net.rubyeye.xmemcached.MemcachedClient#stats(java.net.InetSocketAddress,
2274 	 * long)
2275 	 */
2276 	@SuppressWarnings("unchecked")
2277 	public final Map<String, String> stats(InetSocketAddress address,
2278 			long timeout) throws MemcachedException, InterruptedException,
2279 			TimeoutException {
2280 		if (address == null) {
2281 			throw new IllegalArgumentException("Null inetSocketAddress");
2282 		}
2283 		CountDownLatch latch = new CountDownLatch(1);
2284 
2285 		Queue<Session> sessionQueue = this.connector
2286 				.getSessionByAddress(address);
2287 		if (sessionQueue == null || sessionQueue.peek() == null) {
2288 			throw new MemcachedException("could not find session for "
2289 					+ SystemUtils.getRawAddress(address) + ":"
2290 					+ address.getPort() + ",maybe it have not been connected");
2291 		}
2292 		Command command = this.commandFactory.createStatsCommand(address,
2293 				latch, null);
2294 		final Session session = sessionQueue.peek();
2295 		session.write(command);
2296 		this.latchWait(command, timeout, session);
2297 		return (Map<String, String>) command.getResult();
2298 	}
2299 
2300 	public final Map<InetSocketAddress, Map<String, String>> getStats()
2301 			throws MemcachedException, InterruptedException, TimeoutException {
2302 		return this.getStats(this.opTimeout);
2303 	}
2304 
2305 	public final Map<InetSocketAddress, Map<String, String>> getStatsByItem(
2306 			String itemName) throws MemcachedException, InterruptedException,
2307 			TimeoutException {
2308 		return this.getStatsByItem(itemName, this.opTimeout);
2309 	}
2310 
2311 	@SuppressWarnings("unchecked")
2312 	public final Map<InetSocketAddress, Map<String, String>> getStatsByItem(
2313 			String itemName, long timeout) throws MemcachedException,
2314 			InterruptedException, TimeoutException {
2315 		final Set<Session> sessionSet = this.connector.getSessionSet();
2316 		final Map<InetSocketAddress, Map<String, String>> collectResult = new HashMap<InetSocketAddress, Map<String, String>>();
2317 		if (sessionSet.size() == 0) {
2318 			return collectResult;
2319 		}
2320 		final CountDownLatch latch = new CountDownLatch(sessionSet.size());
2321 		List<Command> commands = new ArrayList<Command>(sessionSet.size());
2322 		for (Session session : sessionSet) {
2323 			Command command = this.commandFactory.createStatsCommand(
2324 					session.getRemoteSocketAddress(), latch, itemName);
2325 
2326 			session.write(command);
2327 			commands.add(command);
2328 
2329 		}
2330 		if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
2331 			for (Command command : commands) {
2332 				command.cancel();
2333 			}
2334 			throw new TimeoutException("Timed out waiting for operation");
2335 		}
2336 		for (Command command : commands) {
2337 			this.checkException(command);
2338 			collectResult.put(((ServerAddressAware) command).getServer(),
2339 					(Map<String, String>) command.getResult());
2340 		}
2341 		return collectResult;
2342 	}
2343 
2344 	public final Map<InetSocketAddress, String> getVersions()
2345 			throws TimeoutException, InterruptedException, MemcachedException {
2346 		return this.getVersions(this.opTimeout);
2347 	}
2348 
2349 	public final Map<InetSocketAddress, String> getVersions(long timeout)
2350 			throws TimeoutException, InterruptedException, MemcachedException {
2351 		final Set<Session> sessionSet = this.connector.getSessionSet();
2352 		Map<InetSocketAddress, String> collectResult = new HashMap<InetSocketAddress, String>();
2353 		if (sessionSet.size() == 0) {
2354 			return collectResult;
2355 		}
2356 		final CountDownLatch latch = new CountDownLatch(sessionSet.size());
2357 		List<Command> commands = new ArrayList<Command>(sessionSet.size());
2358 		for (Session session : sessionSet) {
2359 			Command command = this.commandFactory.createVersionCommand(latch,
2360 					session.getRemoteSocketAddress());
2361 			session.write(command);
2362 			commands.add(command);
2363 
2364 		}
2365 
2366 		if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
2367 			for (Command command : commands) {
2368 				command.cancel();
2369 			}
2370 			throw new TimeoutException("Timed out waiting for operation");
2371 		}
2372 		for (Command command : commands) {
2373 			this.checkException(command);
2374 			collectResult.put(((ServerAddressAware) command).getServer(),
2375 					(String) command.getResult());
2376 		}
2377 		return collectResult;
2378 	}
2379 
2380 	public Map<InetSocketAddress, Map<String, String>> getStats(long timeout)
2381 			throws MemcachedException, InterruptedException, TimeoutException {
2382 		return this.getStatsByItem(null, timeout);
2383 	}
2384 
2385 	/*
2386 	 * (non-Javadoc)
2387 	 * 
2388 	 * @see net.rubyeye.xmemcached.MemcachedClient#shutdown()
2389 	 */
2390 	public final void shutdown() throws IOException {
2391 		if (this.shutdown) {
2392 			return;
2393 		}
2394 		this.shutdown = true;
2395 		this.connector.quitAllSessions();
2396 		this.connector.stop();
2397 		this.memcachedHandler.stop();
2398 		XMemcachedMbeanServer.getInstance().shutdown();
2399 		if (!this.isHutdownHookCalled) {
2400 			try {
2401 				Runtime.getRuntime()
2402 						.removeShutdownHook(this.shutdownHookThread);
2403 			} catch (Exception e) {
2404 				// ignore;
2405 			}
2406 		}
2407 	}
2408 
2409 	private long sendIncrOrDecrCommand(final String key, final long delta,
2410 			long initValue, final CommandType cmdType, boolean noreply,
2411 			long operationTimeout, int exp) throws InterruptedException,
2412 			TimeoutException, MemcachedException {
2413 		final byte[] keyBytes = ByteUtils.getBytes(key);
2414 		ByteUtils.checkKey(keyBytes);
2415 		final Command command = this.commandFactory.createIncrDecrCommand(key,
2416 				keyBytes, delta, initValue, exp, cmdType, noreply);
2417 		final Session session = this.sendCommand(command);
2418 		if (!command.isNoreply()) {
2419 			this.latchWait(command, operationTimeout, session);
2420 			command.getIoBuffer().free();
2421 			this.checkException(command);
2422 			if (command.getResult() == null) {
2423 				throw new MemcachedException(
2424 						"Operation fail,may be caused by networking or timeout");
2425 			}
2426 			final Object result = command.getResult();
2427 			if (result instanceof String) {
2428 				if (((String) result).equals("NOT_FOUND")) {
2429 					if (this.add0(key, exp, String.valueOf(initValue),
2430 							this.transcoder, this.opTimeout)) {
2431 						return initValue;
2432 					} else {
2433 						return this.sendIncrOrDecrCommand(key, delta,
2434 								initValue, cmdType, noreply, operationTimeout,
2435 								exp);
2436 					}
2437 				} else {
2438 					throw new MemcachedException(
2439 							"Unknown result type for incr/decr:"
2440 									+ result.getClass() + ",result=" + result);
2441 				}
2442 			} else {
2443 				return (Long) command.getResult();
2444 			}
2445 		} else {
2446 			return -1;
2447 		}
2448 	}
2449 
2450 	public void setConnectionPoolSize(int poolSize) {
2451 		if (!this.shutdown && this.getAvaliableServers().size() > 0) {
2452 			throw new IllegalStateException(
2453 					"Xmemcached client has been started");
2454 		}
2455 		if (poolSize <= 0) {
2456 			throw new IllegalArgumentException("poolSize<=0");
2457 		}
2458 		this.connectionPoolSize = poolSize;
2459 		this.connector.setConnectionPoolSize(poolSize);
2460 	}
2461 
2462 	/*
2463 	 * (non-Javadoc)
2464 	 * 
2465 	 * @see net.rubyeye.xmemcached.MemcachedClient#delete(java.lang.String)
2466 	 */
2467 	public final boolean delete(final String key) throws TimeoutException,
2468 			InterruptedException, MemcachedException {
2469 		return this.delete(key, 0);
2470 	}
2471 
2472 	/*
2473 	 * (non-Javadoc)
2474 	 * 
2475 	 * @see net.rubyeye.xmemcached.MemcachedClient#getTranscoder()
2476 	 */
2477 	@SuppressWarnings("unchecked")
2478 	public final Transcoder getTranscoder() {
2479 		return this.transcoder;
2480 	}
2481 
2482 	/*
2483 	 * (non-Javadoc)
2484 	 * 
2485 	 * @see
2486 	 * net.rubyeye.xmemcached.MemcachedClient#setTranscoder(net.rubyeye.xmemcached
2487 	 * .transcoders.Transcoder)
2488 	 */
2489 	@SuppressWarnings("unchecked")
2490 	public final void setTranscoder(final Transcoder transcoder) {
2491 		this.transcoder = transcoder;
2492 	}
2493 
2494 	private final <T> boolean sendStoreCommand(Command command, long timeout)
2495 			throws InterruptedException, TimeoutException, MemcachedException {
2496 
2497 		final Session session = this.sendCommand(command);
2498 		if (!command.isNoreply()) {
2499 			this.latchWait(command, timeout, session);
2500 			command.getIoBuffer().free();
2501 			this.checkException(command);
2502 			if (command.getResult() == null) {
2503 				throw new MemcachedException(
2504 						"Operation fail,may be caused by networking or timeout");
2505 			}
2506 		} else {
2507 			return false;
2508 		}
2509 		return (Boolean) command.getResult();
2510 	}
2511 
2512 	private static final String CONTINUOUS_TIMEOUT_COUNTER = "ContinuousTimeouts";
2513 
2514 	private void latchWait(final Command cmd, final long timeout,
2515 			final Session session) throws InterruptedException,
2516 			TimeoutException {
2517 		if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
2518 			AtomicInteger counter = this.getContinuousTimeoutCounter(session);
2519 			// reset counter.
2520 			if (counter.get() > 0) {
2521 				counter.set(0);
2522 			}
2523 		} else {
2524 			cmd.cancel();
2525 			AtomicInteger counter = this.getContinuousTimeoutCounter(session);
2526 			if (counter.incrementAndGet() > this.timeoutExceptionThreshold) {
2527 				log.warn(session
2528 						+ " exceeded continuous timeout threshold,we will close it.");
2529 				try {
2530 					// reset counter.
2531 					counter.set(0);
2532 					session.close();
2533 				} catch (Exception e) {
2534 					// ignore it.
2535 				}
2536 			}
2537 			throw new TimeoutException(
2538 					"Timed out("
2539 							+ timeout
2540 							+ " milliseconds) waiting for operation while connected to "
2541 							+ session);
2542 		}
2543 	}
2544 
2545 	private AtomicInteger getContinuousTimeoutCounter(final Session session) {
2546 		AtomicInteger counter = (AtomicInteger) session
2547 				.getAttribute(CONTINUOUS_TIMEOUT_COUNTER);
2548 		if (counter == null) {
2549 			counter = new AtomicInteger(0);
2550 			AtomicInteger oldCounter = (AtomicInteger) session
2551 					.setAttributeIfAbsent(CONTINUOUS_TIMEOUT_COUNTER, counter);
2552 			if (oldCounter != null) {
2553 				counter = oldCounter;
2554 			}
2555 		}
2556 		return counter;
2557 	}
2558 
2559 	/**
2560 	 * Use getAvailableServers() instead
2561 	 * 
2562 	 * @deprecated
2563 	 * @see MemcachedClient#getAvailableServers()
2564 	 */
2565 	@Deprecated
2566 	public final Collection<InetSocketAddress> getAvaliableServers() {
2567 		return this.getAvailableServers();
2568 	}
2569 
2570 	public Collection<InetSocketAddress> getAvailableServers() {
2571 		Set<Session> sessionSet = this.connector.getSessionSet();
2572 		Set<InetSocketAddress> result = new HashSet<InetSocketAddress>();
2573 		for (Session session : sessionSet) {
2574 			result.add(session.getRemoteSocketAddress());
2575 		}
2576 		return Collections.unmodifiableSet(result);
2577 	}
2578 
2579 	public final int getConnectionSizeBySocketAddress(InetSocketAddress address) {
2580 		Queue<Session> sessionList = this.connector
2581 				.getSessionByAddress(address);
2582 		return sessionList == null ? 0 : sessionList.size();
2583 	}
2584 
2585 	public void addStateListener(MemcachedClientStateListener listener) {
2586 		MemcachedClientStateListenerAdapter adapter = new MemcachedClientStateListenerAdapter(
2587 				listener, this);
2588 		this.stateListenerAdapters.add(adapter);
2589 		this.connector.addStateListener(adapter);
2590 	}
2591 
2592 	public Collection<MemcachedClientStateListener> getStateListeners() {
2593 		final List<MemcachedClientStateListener> result = new ArrayList<MemcachedClientStateListener>(
2594 				this.stateListenerAdapters.size());
2595 		for (MemcachedClientStateListenerAdapter adapter : this.stateListenerAdapters) {
2596 			result.add(adapter.getMemcachedClientStateListener());
2597 		}
2598 		return result;
2599 	}
2600 
2601 	public void setPrimitiveAsString(boolean primitiveAsString) {
2602 		this.transcoder.setPrimitiveAsString(primitiveAsString);
2603 	}
2604 
2605 	public void removeStateListener(MemcachedClientStateListener listener) {
2606 		for (MemcachedClientStateListenerAdapter adapter : this.stateListenerAdapters) {
2607 			if (adapter.getMemcachedClientStateListener().equals(listener)) {
2608 				this.stateListenerAdapters.remove(adapter);
2609 				this.connector.removeStateListener(adapter);
2610 			}
2611 		}
2612 	}
2613 
2614 	public Protocol getProtocol() {
2615 		return this.commandFactory.getProtocol();
2616 	}
2617 
2618 	public boolean isSanitizeKeys() {
2619 		return this.sanitizeKeys;
2620 	}
2621 
2622 	public void setSanitizeKeys(boolean sanitizeKeys) {
2623 		this.sanitizeKeys = sanitizeKeys;
2624 	}
2625 
2626 	private String decodeKey(String key) throws MemcachedException,
2627 			InterruptedException, TimeoutException {
2628 		try {
2629 			key = this.sanitizeKeys ? URLDecoder.decode(key, "UTF-8") : key;
2630 		} catch (UnsupportedEncodingException e) {
2631 			throw new MemcachedException(
2632 					"Unsupport encoding utf-8 when decodeKey", e);
2633 		}
2634 		String ns = NAMESPACE_LOCAL.get();
2635 		if (ns != null && ns.trim().length() > 0) {
2636 			String nsValue = this.getNamespace(ns);
2637 			try {
2638 				if (nsValue != null && key.startsWith(nsValue)) {
2639 					//The extra length of ':'
2640 					key = key.substring(nsValue.length() + 1);
2641 				} else {
2642 					return null;
2643 				}
2644 			} catch (Exception e) {
2645 				throw new MemcachedException(
2646 						"Exception occured when decode key.", e);
2647 			}
2648 		}
2649 		return key;
2650 	}
2651 
2652 	private String preProcessKey(String key) throws MemcachedException,
2653 			InterruptedException {
2654 		key = this.keyProvider.process(key);
2655 		try {
2656 			key = this.sanitizeKeys ? URLEncoder.encode(key, "UTF-8") : key;
2657 		} catch (UnsupportedEncodingException e) {
2658 			throw new MemcachedException(
2659 					"Unsupport encoding utf-8 when sanitize key", e);
2660 		}
2661 		String ns = NAMESPACE_LOCAL.get();
2662 		if (ns != null && ns.trim().length() > 0) {
2663 			try {
2664 				key = this.getNamespace(ns) + ":" + key;
2665 			} catch (TimeoutException e) {
2666 				throw new MemcachedException(
2667 						"Timeout occured when gettting namespace value.", e);
2668 			}
2669 		}
2670 		return key;
2671 	}
2672 
2673 	public void invalidateNamespace(String ns, long opTimeout)
2674 			throws MemcachedException, InterruptedException, TimeoutException {
2675 		String key = this.getNSKey(ns);
2676 		this.incr(key, 1, System.currentTimeMillis(), opTimeout);
2677 	}
2678 
2679 	public void invalidateNamespace(String ns) throws MemcachedException,
2680 			InterruptedException, TimeoutException {
2681 		this.invalidateNamespace(ns, this.opTimeout);
2682 	}
2683 
2684 	/**
2685 	 * Returns the real namespace of ns.
2686 	 * 
2687 	 * @param ns
2688 	 * @return
2689 	 * @throws TimeoutException
2690 	 * @throws InterruptedException
2691 	 * @throws MemcachedException
2692 	 */
2693 	public String getNamespace(String ns) throws TimeoutException,
2694 			InterruptedException, MemcachedException {
2695 		String key = this.keyProvider.process(this.getNSKey(ns));
2696 		byte[] keyBytes = ByteUtils.getBytes(key);
2697 		ByteUtils.checkKey(keyBytes);
2698 		Object item = this.fetch0(key, keyBytes, CommandType.GET_ONE,
2699 				this.opTimeout, this.transcoder);
2700 		while (item == null) {
2701 			item = String.valueOf(System.nanoTime());
2702 			boolean added = this.add0(key, 0, item, this.transcoder,
2703 					this.opTimeout);
2704 			if (!added) {
2705 				item = this.fetch0(key, keyBytes, CommandType.GET_ONE,
2706 						this.opTimeout, this.transcoder);
2707 			}
2708 		}
2709 		String namespace = item.toString();
2710 		if (!ByteUtils.isNumber(namespace)) {
2711 			throw new IllegalStateException(
2712 					"Namespace key already has value.The key is:" + key
2713 							+ ",and the value is:" + namespace);
2714 		}
2715 		return namespace;
2716 	}
2717 
2718 	private String getNSKey(String ns) {
2719 		String key = "namespace:" + ns;
2720 		return key;
2721 	}
2722 
2723 	public Counter getCounter(String key, long initialValue) {
2724 		return new Counter(this, key, initialValue);
2725 	}
2726 
2727 	public Counter getCounter(String key) {
2728 		return new Counter(this, key, 0);
2729 	}
2730 
2731 	/**
2732 	 * @deprecated memcached 1.6.x will remove cachedump stats command,so this
2733 	 *             method will be removed in the future
2734 	 */
2735 	@Deprecated
2736 	@SuppressWarnings("unchecked")
2737 	public KeyIterator getKeyIterator(InetSocketAddress address)
2738 			throws MemcachedException, TimeoutException, InterruptedException {
2739 		if (address == null) {
2740 			throw new IllegalArgumentException("null address");
2741 		}
2742 		Queue<Session> sessions = this.connector.getSessionByAddress(address);
2743 		if (sessions == null || sessions.size() == 0) {
2744 			throw new MemcachedException(
2745 					"The special memcached server has not been connected,"
2746 							+ address);
2747 		}
2748 		Session session = sessions.peek();
2749 		CountDownLatch latch = new CountDownLatch(1);
2750 		Command command = this.commandFactory.createStatsCommand(
2751 				session.getRemoteSocketAddress(), latch, "items");
2752 		session.write(command);
2753 		if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
2754 			throw new TimeoutException("Operation timeout");
2755 		}
2756 		if (command.getException() != null) {
2757 			if (command.getException() instanceof MemcachedException) {
2758 				throw (MemcachedException) command.getException();
2759 			} else {
2760 				throw new MemcachedException("stats items failed",
2761 						command.getException());
2762 			}
2763 		}
2764 		Map<String, String> result = (Map<String, String>) command.getResult();
2765 		LinkedList<Integer> itemNumberList = new LinkedList<Integer>();
2766 		for (Map.Entry<String, String> entry : result.entrySet()) {
2767 			final String key = entry.getKey();
2768 			final String[] keys = key.split(":");
2769 			if (keys.length == 3 && keys[2].equals("number")
2770 					&& keys[0].equals("items")) {
2771 				// has items,then add it to itemNumberList
2772 				if (Integer.parseInt(entry.getValue()) > 0) {
2773 					itemNumberList.add(Integer.parseInt(keys[1]));
2774 				}
2775 			}
2776 		}
2777 		return new KeyIteratorImpl(itemNumberList, this, address);
2778 	}
2779 
2780 	public void setEnableHealSession(boolean enableHealSession) {
2781 		if (this.connector != null) {
2782 			this.connector.setEnableHealSession(enableHealSession);
2783 		} else {
2784 			throw new IllegalStateException("The client has not been started.");
2785 		}
2786 	}
2787 
2788 	public void setFailureMode(boolean failureMode) {
2789 		this.failureMode = failureMode;
2790 		if (this.sessionLocator != null) {
2791 			this.sessionLocator.setFailureMode(failureMode);
2792 		}
2793 		if (this.connector != null) {
2794 			this.connector.setFailureMode(failureMode);
2795 		}
2796 	}
2797 
2798 	public boolean isFailureMode() {
2799 		return this.failureMode;
2800 	}
2801 
2802 	public Queue<ReconnectRequest> getReconnectRequestQueue() {
2803 		return this.connector != null ? this.connector
2804 				.getReconnectRequestQueue() : null;
2805 	}
2806 
2807 }