1
2
3
4
5
6
7
8
9
10
11
12 package net.rubyeye.xmemcached;
13
14 import java.io.IOException;
15 import java.io.UnsupportedEncodingException;
16 import java.net.InetSocketAddress;
17 import java.net.URLDecoder;
18 import java.net.URLEncoder;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.HashSet;
24 import java.util.Iterator;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Queue;
29 import java.util.Set;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.CountDownLatch;
32 import java.util.concurrent.ExecutionException;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import net.rubyeye.xmemcached.auth.AuthInfo;
39 import net.rubyeye.xmemcached.buffer.BufferAllocator;
40 import net.rubyeye.xmemcached.buffer.SimpleBufferAllocator;
41 import net.rubyeye.xmemcached.codec.MemcachedCodecFactory;
42 import net.rubyeye.xmemcached.command.Command;
43 import net.rubyeye.xmemcached.command.CommandType;
44 import net.rubyeye.xmemcached.command.ServerAddressAware;
45 import net.rubyeye.xmemcached.command.TextCommandFactory;
46 import net.rubyeye.xmemcached.exception.MemcachedException;
47 import net.rubyeye.xmemcached.exception.NoValueException;
48 import net.rubyeye.xmemcached.impl.ArrayMemcachedSessionLocator;
49 import net.rubyeye.xmemcached.impl.ClosedMemcachedTCPSession;
50 import net.rubyeye.xmemcached.impl.DefaultKeyProvider;
51 import net.rubyeye.xmemcached.impl.KeyIteratorImpl;
52 import net.rubyeye.xmemcached.impl.MemcachedClientStateListenerAdapter;
53 import net.rubyeye.xmemcached.impl.MemcachedConnector;
54 import net.rubyeye.xmemcached.impl.MemcachedHandler;
55 import net.rubyeye.xmemcached.impl.MemcachedTCPSession;
56 import net.rubyeye.xmemcached.impl.ReconnectRequest;
57 import net.rubyeye.xmemcached.monitor.Constants;
58 import net.rubyeye.xmemcached.monitor.MemcachedClientNameHolder;
59 import net.rubyeye.xmemcached.monitor.XMemcachedMbeanServer;
60 import net.rubyeye.xmemcached.networking.Connector;
61 import net.rubyeye.xmemcached.networking.MemcachedSession;
62 import net.rubyeye.xmemcached.transcoders.CachedData;
63 import net.rubyeye.xmemcached.transcoders.SerializingTranscoder;
64 import net.rubyeye.xmemcached.transcoders.Transcoder;
65 import net.rubyeye.xmemcached.utils.AddrUtil;
66 import net.rubyeye.xmemcached.utils.ByteUtils;
67 import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
68 import net.rubyeye.xmemcached.utils.Protocol;
69
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 import com.google.code.yanf4j.config.Configuration;
74 import com.google.code.yanf4j.core.Session;
75 import com.google.code.yanf4j.core.SocketOption;
76 import com.google.code.yanf4j.util.SystemUtils;
77
78
79
80
81
82
83
84 public class XMemcachedClient implements XMemcachedClientMBean, MemcachedClient {
85
86 private static final Logger log = LoggerFactory
87 .getLogger(XMemcachedClient.class);
88 protected MemcachedSessionLocator sessionLocator;
89 private volatile boolean shutdown;
90 protected MemcachedConnector connector;
91 @SuppressWarnings("unchecked")
92 private Transcoder transcoder;
93 private boolean sanitizeKeys;
94 private MemcachedHandler memcachedHandler;
95 protected CommandFactory commandFactory;
96 private long opTimeout = DEFAULT_OP_TIMEOUT;
97 private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
98 protected int connectionPoolSize = DEFAULT_CONNECTION_POOL_SIZE;
99 protected int maxQueuedNoReplyOperations = DEFAULT_MAX_QUEUED_NOPS;
100
101 protected final AtomicInteger serverOrderCount = new AtomicInteger();
102
103 private Map<InetSocketAddress, AuthInfo> authInfoMap = new HashMap<InetSocketAddress, AuthInfo>();
104
105 private String name;
106
107 private volatile boolean failureMode;
108
109 private int timeoutExceptionThreshold = DEFAULT_MAX_TIMEOUTEXCEPTION_THRESHOLD;
110
111 private final CopyOnWriteArrayList<MemcachedClientStateListenerAdapter> stateListenerAdapters = new CopyOnWriteArrayList<MemcachedClientStateListenerAdapter>();
112 private Thread shutdownHookThread;
113 private volatile boolean isHutdownHookCalled = false;
114
115
116 private KeyProvider keyProvider = DefaultKeyProvider.INSTANCE;
117
118
119
120 public static final ThreadLocal<String> NAMESPACE_LOCAL = new ThreadLocal<String>();
121
122
123
124
125
126
127 public final void setMergeFactor(final int mergeFactor) {
128 if (mergeFactor < 0) {
129 throw new IllegalArgumentException("mergeFactor<0");
130 }
131 this.connector.setMergeFactor(mergeFactor);
132 }
133
134 public int getTimeoutExceptionThreshold() {
135 return this.timeoutExceptionThreshold;
136 }
137
138 public void setTimeoutExceptionThreshold(int timeoutExceptionThreshold) {
139 if (timeoutExceptionThreshold <= 0) {
140 throw new IllegalArgumentException(
141 "Illegal timeoutExceptionThreshold value "
142 + timeoutExceptionThreshold);
143 }
144 if (timeoutExceptionThreshold < 100) {
145 log.warn("Too small timeoutExceptionThreshold value may cause connections disconnect/reconnect frequently.");
146 }
147 this.timeoutExceptionThreshold = timeoutExceptionThreshold;
148 }
149
150 public <T> T withNamespace(String ns, MemcachedClientCallable<T> callable)
151 throws MemcachedException, InterruptedException, TimeoutException {
152 this.beginWithNamespace(ns);
153 try {
154 return callable.call(this);
155 } finally {
156 this.endWithNamespace();
157 }
158 }
159
160 public void endWithNamespace() {
161 NAMESPACE_LOCAL.remove();
162 }
163
164 public void beginWithNamespace(String ns) {
165 if (ns == null || ns.trim().length() == 0) {
166 throw new IllegalArgumentException("Blank namespace");
167 }
168 if (NAMESPACE_LOCAL.get() != null) {
169 throw new IllegalStateException("Previous namespace wasn't ended.");
170 }
171 NAMESPACE_LOCAL.set(ns);
172 }
173
174 public KeyProvider getKeyProvider() {
175 return this.keyProvider;
176 }
177
178 public void setKeyProvider(KeyProvider keyProvider) {
179 if (keyProvider == null) {
180 throw new IllegalArgumentException("Null key provider");
181 }
182 this.keyProvider = keyProvider;
183 }
184
185 public final MemcachedSessionLocator getSessionLocator() {
186 return this.sessionLocator;
187 }
188
189 public final CommandFactory getCommandFactory() {
190 return this.commandFactory;
191 }
192
193 public String getName() {
194 return this.name;
195 }
196
197 public void setName(String name) {
198 this.name = name;
199 }
200
201
202
203
204
205
206 public long getConnectTimeout() {
207 return this.connectTimeout;
208 }
209
210
211
212
213
214
215 public void setConnectTimeout(long connectTimeout) {
216 if (connectTimeout < 0) {
217 throw new IllegalArgumentException("connectTimeout<0");
218 }
219 this.connectTimeout = connectTimeout;
220 }
221
222 public void setEnableHeartBeat(boolean enableHeartBeat) {
223 this.memcachedHandler.setEnableHeartBeat(enableHeartBeat);
224 }
225
226
227
228
229
230
231 public final long getOpTimeout() {
232 return this.opTimeout;
233 }
234
235
236
237
238
239
240 public final void setOpTimeout(long opTimeout) {
241 if (opTimeout < 0) {
242 throw new IllegalArgumentException("opTimeout<0");
243 }
244 this.opTimeout = opTimeout;
245 }
246
247 public void setHealSessionInterval(long healConnectionInterval) {
248 if (healConnectionInterval <= 0) {
249 throw new IllegalArgumentException("Invalid heal session interval:"
250 + healConnectionInterval);
251 }
252 if (null != this.connector) {
253 this.connector.setHealSessionInterval(healConnectionInterval);
254 } else {
255 throw new IllegalStateException("The client hasn't been started");
256 }
257 }
258
259 public long getHealSessionInterval() {
260 if (null != this.connector) {
261 return this.connector.getHealSessionInterval();
262 }
263 return -1L;
264 }
265
266 public Map<InetSocketAddress, AuthInfo> getAuthInfoMap() {
267 return this.authInfoMap;
268 }
269
270 public void setAuthInfoMap(Map<InetSocketAddress, AuthInfo> map) {
271 this.authInfoMap = map;
272 }
273
274
275
276
277
278
279 public final Connector getConnector() {
280 return this.connector;
281 }
282
283
284
285
286
287
288
289 public final void setOptimizeMergeBuffer(final boolean optimizeMergeBuffer) {
290 this.connector.setOptimizeMergeBuffer(optimizeMergeBuffer);
291 }
292
293
294
295
296
297
298 public final boolean isShutdown() {
299 return this.shutdown;
300 }
301
302 @SuppressWarnings("unchecked")
303 private final <T> GetsResponse<T> gets0(final String key,
304 final byte[] keyBytes, final Transcoder<T> transcoder)
305 throws MemcachedException, TimeoutException, InterruptedException {
306 GetsResponse<T> result = (GetsResponse<T>) this.fetch0(key, keyBytes,
307 CommandType.GETS_ONE, this.opTimeout, transcoder);
308 return result;
309 }
310
311 private final Session sendCommand(final Command cmd)
312 throws MemcachedException {
313 if (this.shutdown) {
314 throw new MemcachedException("Xmemcached is stopped");
315 }
316 return this.connector.send(cmd);
317 }
318
319
320
321
322
323
324
325
326
327
328 public XMemcachedClient(final String server, final int port)
329 throws IOException {
330 this(server, port, 1);
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344 public XMemcachedClient(final String host, final int port, int weight)
345 throws IOException {
346 super();
347 if (weight <= 0) {
348 throw new IllegalArgumentException("weight<=0");
349 }
350 this.checkServerPort(host, port);
351 this.buildConnector(new ArrayMemcachedSessionLocator(),
352 new SimpleBufferAllocator(),
353 XMemcachedClientBuilder.getDefaultConfiguration(),
354 XMemcachedClientBuilder.getDefaultSocketOptions(),
355 new TextCommandFactory(), new SerializingTranscoder());
356 this.start0();
357 this.connect(new InetSocketAddressWrapper(this.newSocketAddress(host,
358 port), this.serverOrderCount.incrementAndGet(), weight, null));
359 }
360
361 protected InetSocketAddress newSocketAddress(final String server,
362 final int port) {
363 return new InetSocketAddress(server, port);
364 }
365
366 private void checkServerPort(String server, int port) {
367 if (server == null || server.length() == 0) {
368 throw new IllegalArgumentException();
369 }
370 if (port <= 0) {
371 throw new IllegalArgumentException();
372 }
373 }
374
375
376
377
378
379
380
381 public final void addServer(final String server, final int port)
382 throws IOException {
383 this.addServer(server, port, 1);
384 }
385
386
387
388
389
390
391
392
393
394 public final void addServer(final String server, final int port, int weight)
395 throws IOException {
396 if (weight <= 0) {
397 throw new IllegalArgumentException("weight<=0");
398 }
399 this.checkServerPort(server, port);
400 this.connect(new InetSocketAddressWrapper(this.newSocketAddress(server,
401 port), this.serverOrderCount.incrementAndGet(), weight, null));
402 }
403
404
405
406
407
408
409
410
411 public final void addServer(final InetSocketAddress inetSocketAddress)
412 throws IOException {
413 this.addServer(inetSocketAddress, 1);
414 }
415
416 public final void addServer(final InetSocketAddress inetSocketAddress,
417 int weight) throws IOException {
418 if (inetSocketAddress == null) {
419 throw new IllegalArgumentException("Null InetSocketAddress");
420 }
421 if (weight <= 0) {
422 throw new IllegalArgumentException("weight<=0");
423 }
424 this.connect(new InetSocketAddressWrapper(inetSocketAddress,
425 this.serverOrderCount.incrementAndGet(), weight, null));
426 }
427
428
429
430
431
432
433 public final void addServer(String hostList) throws IOException {
434 Map<InetSocketAddress, InetSocketAddress> addresses = AddrUtil
435 .getAddressMap(hostList);
436 if (addresses != null && addresses.size() > 0) {
437 for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addresses
438 .entrySet()) {
439 final InetSocketAddress mainNodeAddr = entry.getKey();
440 final InetSocketAddress standbyNodeAddr = entry.getValue();
441 this.connect(new InetSocketAddressWrapper(mainNodeAddr,
442 this.serverOrderCount.incrementAndGet(), 1, null));
443 if (standbyNodeAddr != null) {
444 this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
445 this.serverOrderCount.incrementAndGet(), 1,
446 mainNodeAddr));
447 }
448 }
449 }
450 }
451
452 public void addOneServerWithWeight(String server, int weight)
453 throws IOException {
454 Map<InetSocketAddress, InetSocketAddress> addresses = AddrUtil
455 .getAddressMap(server);
456 if (addresses == null) {
457 throw new IllegalArgumentException("Null Server");
458 }
459 if (addresses.size() != 1) {
460 throw new IllegalArgumentException(
461 "Please add one server at one time");
462 }
463 if (weight <= 0) {
464 throw new IllegalArgumentException("weight<=0");
465 }
466 if (addresses != null && addresses.size() > 0) {
467 for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addresses
468 .entrySet()) {
469 final InetSocketAddress mainNodeAddr = entry.getKey();
470 final InetSocketAddress standbyNodeAddr = entry.getValue();
471 this.connect(new InetSocketAddressWrapper(mainNodeAddr,
472 this.serverOrderCount.incrementAndGet(), 1, null));
473 if (standbyNodeAddr != null) {
474 this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
475 this.serverOrderCount.incrementAndGet(), 1,
476 mainNodeAddr));
477 }
478 }
479 }
480
481 }
482
483
484
485
486
487
488 public final List<String> getServersDescription() {
489 final List<String> result = new ArrayList<String>();
490 for (Session session : this.connector.getSessionSet()) {
491 InetSocketAddress socketAddress = session.getRemoteSocketAddress();
492 int weight = ((MemcachedSession) session)
493 .getInetSocketAddressWrapper().getWeight();
494 result.add(SystemUtils.getRawAddress(socketAddress) + ":"
495 + socketAddress.getPort() + "(weight=" + weight + ")");
496 }
497 return result;
498 }
499
500 public final void setServerWeight(String server, int weight) {
501 InetSocketAddress socketAddress = AddrUtil.getOneAddress(server);
502 Queue<Session> sessionQueue = this.connector
503 .getSessionByAddress(socketAddress);
504 if (sessionQueue == null) {
505 throw new IllegalArgumentException("There is no server " + server);
506 }
507 for (Session session : sessionQueue) {
508 if (session != null) {
509 ((MemcachedTCPSession) session).getInetSocketAddressWrapper()
510 .setWeight(weight);
511 }
512 }
513 this.connector.updateSessions();
514 }
515
516
517
518
519
520
521
522 public final void removeServer(String hostList) {
523 List<InetSocketAddress> addresses = AddrUtil.getAddresses(hostList);
524 if (addresses != null && addresses.size() > 0) {
525 for (InetSocketAddress address : addresses) {
526
527 Queue<Session> sessionQueue = this.connector
528 .getSessionByAddress(address);
529 if (sessionQueue != null) {
530 for (Session session : sessionQueue) {
531 if (session != null) {
532
533 ((MemcachedSession) session)
534 .setAllowReconnect(false);
535
536 ((MemcachedSession) session).quit();
537 }
538 }
539 }
540
541 List<Session> standBySession = this.connector
542 .getStandbySessionListByMainNodeAddr(address);
543 if (standBySession != null) {
544 for (Session session : standBySession) {
545 this.connector.removeReconnectRequest(session
546 .getRemoteSocketAddress());
547 if (session != null) {
548
549 ((MemcachedSession) session)
550 .setAllowReconnect(false);
551
552 ((MemcachedSession) session).quit();
553 }
554 }
555 }
556 this.connector.removeReconnectRequest(address);
557 }
558
559 }
560
561 }
562
563 protected void checkSocketAddress(InetSocketAddress address) {
564
565 }
566
567 private void connect(final InetSocketAddressWrapper inetSocketAddressWrapper)
568 throws IOException {
569
570 InetSocketAddress inetSocketAddress = inetSocketAddressWrapper
571 .getInetSocketAddress();
572 this.checkSocketAddress(inetSocketAddress);
573 if (this.connectionPoolSize > 1) {
574 log.warn("You are using connection pool for xmemcached client,it's not recommended unless you have test it that it can boost performance in your app.");
575 }
576 for (int i = 0; i < this.connectionPoolSize; i++) {
577 Future<Boolean> future = null;
578 boolean connected = false;
579 Throwable throwable = null;
580 try {
581 future = this.connector.connect(inetSocketAddressWrapper);
582
583 if (!future.get(this.connectTimeout, TimeUnit.MILLISECONDS)) {
584 log.error("connect to "
585 + SystemUtils.getRawAddress(inetSocketAddress)
586 + ":" + inetSocketAddress.getPort() + " fail");
587 } else {
588 connected = true;
589 }
590 } catch (InterruptedException e) {
591 Thread.currentThread().interrupt();
592 } catch (ExecutionException e) {
593 throwable = e;
594 log.error(
595 "connect to "
596 + SystemUtils.getRawAddress(inetSocketAddress)
597 + ":" + inetSocketAddress.getPort() + " error",
598 e);
599 } catch (TimeoutException e) {
600 throwable = e;
601 log.error(
602 "connect to "
603 + SystemUtils.getRawAddress(inetSocketAddress)
604 + ":" + inetSocketAddress.getPort()
605 + " timeout", e);
606 } catch (Exception e) {
607 throwable = e;
608 log.error(
609 "connect to "
610 + SystemUtils.getRawAddress(inetSocketAddress)
611 + ":" + inetSocketAddress.getPort() + " error",
612 e);
613 }
614
615
616 if (!connected) {
617 if (future != null) {
618 future.cancel(true);
619 }
620
621 if (this.failureMode) {
622 this.connector.addSession(new ClosedMemcachedTCPSession(
623 inetSocketAddressWrapper));
624 }
625 this.connector.addToWatingQueue(new ReconnectRequest(
626 inetSocketAddressWrapper, 0, this
627 .getHealSessionInterval()));
628 log.error(
629 "Connect to "
630 + SystemUtils.getRawAddress(inetSocketAddress)
631 + ":" + inetSocketAddress.getPort() + " fail",
632 throwable);
633
634 }
635 }
636 }
637
638 @SuppressWarnings("unchecked")
639 private final <T> Object fetch0(final String key, final byte[] keyBytes,
640 final CommandType cmdType, final long timeout,
641 Transcoder<T> transcoder) throws InterruptedException,
642 TimeoutException, MemcachedException, MemcachedException {
643 final Command command = this.commandFactory.createGetCommand(key,
644 keyBytes, cmdType, this.transcoder);
645 this.latchWait(command, timeout, this.sendCommand(command));
646 command.getIoBuffer().free();
647 this.checkException(command);
648 CachedData data = (CachedData) command.getResult();
649 if (data == null) {
650 return null;
651 }
652 if (transcoder == null) {
653 transcoder = this.transcoder;
654 }
655 if (cmdType == CommandType.GETS_ONE) {
656 return new GetsResponse<T>(data.getCas(), transcoder.decode(data));
657 } else {
658 return transcoder.decode(data);
659 }
660 }
661
662 private final void start0() throws IOException {
663 this.registerMBean();
664 this.startConnector();
665 MemcachedClientNameHolder.clear();
666 }
667
668 private final void startConnector() throws IOException {
669 if (this.shutdown) {
670 this.shutdown = false;
671 this.connector.start();
672 this.memcachedHandler.start();
673 this.shutdownHookThread = new Thread() {
674 @Override
675 public void run() {
676 try {
677 XMemcachedClient.this.isHutdownHookCalled = true;
678 XMemcachedClient.this.shutdown();
679 } catch (IOException e) {
680 log.error("Shutdown XMemcachedClient error", e);
681 }
682 }
683 };
684 Runtime.getRuntime().addShutdownHook(this.shutdownHookThread);
685 }
686 }
687
688
689
690
691
692
693 void setMaxQueuedNoReplyOperations(int maxQueuedNoReplyOperations) {
694 if (maxQueuedNoReplyOperations <= 1) {
695 throw new IllegalArgumentException("maxQueuedNoReplyOperations<=1");
696 }
697 this.maxQueuedNoReplyOperations = maxQueuedNoReplyOperations;
698 }
699
700 @SuppressWarnings("unchecked")
701 private void buildConnector(MemcachedSessionLocator locator,
702 BufferAllocator bufferAllocator, Configuration configuration,
703 Map<SocketOption, Object> socketOptions,
704 CommandFactory commandFactory, Transcoder transcoder) {
705 if (locator == null) {
706 locator = new ArrayMemcachedSessionLocator();
707
708 }
709 if (bufferAllocator == null) {
710 bufferAllocator = new SimpleBufferAllocator();
711 }
712 if (configuration == null) {
713 configuration = XMemcachedClientBuilder.getDefaultConfiguration();
714 }
715 if (transcoder == null) {
716 transcoder = new SerializingTranscoder();
717 }
718 if (commandFactory == null) {
719 commandFactory = new TextCommandFactory();
720 }
721 if (this.name == null) {
722 this.name = "MemcachedClient-"
723 + Constants.MEMCACHED_CLIENT_COUNTER.getAndIncrement();
724 MemcachedClientNameHolder.setName(this.name);
725 }
726 this.commandFactory = commandFactory;
727 ByteUtils.setProtocol(this.commandFactory.getProtocol());
728 log.warn("XMemcachedClient is using "
729 + this.commandFactory.getProtocol().name() + " protocol");
730 this.commandFactory.setBufferAllocator(bufferAllocator);
731 this.shutdown = true;
732 this.transcoder = transcoder;
733 this.sessionLocator = locator;
734 this.connector = this.newConnector(bufferAllocator, configuration,
735 this.sessionLocator, this.commandFactory,
736 this.connectionPoolSize, this.maxQueuedNoReplyOperations);
737 this.memcachedHandler = new MemcachedHandler(this);
738 this.connector.setHandler(this.memcachedHandler);
739 this.connector.setCodecFactory(new MemcachedCodecFactory());
740 this.connector.setSessionTimeout(-1);
741 this.connector.setSocketOptions(socketOptions);
742 if (this.isFailureMode()) {
743 log.warn("XMemcachedClient in failure mode.");
744 }
745 this.connector.setFailureMode(this.failureMode);
746 this.sessionLocator.setFailureMode(this.failureMode);
747 }
748
749 protected MemcachedConnector newConnector(BufferAllocator bufferAllocator,
750 Configuration configuration,
751 MemcachedSessionLocator memcachedSessionLocator,
752 CommandFactory commandFactory, int poolSize,
753 int maxQueuedNoReplyOperations) {
754
755 configuration.setDispatchMessageThreadCount(0);
756 return new MemcachedConnector(configuration, memcachedSessionLocator,
757 bufferAllocator, commandFactory, poolSize,
758 maxQueuedNoReplyOperations);
759 }
760
761 private final void registerMBean() {
762 if (this.shutdown) {
763 XMemcachedMbeanServer.getInstance().registMBean(
764 this,
765 this.getClass().getPackage().getName() + ":type="
766 + this.getClass().getSimpleName() + "-"
767 + MemcachedClientNameHolder.getName());
768 }
769 }
770
771 public void setOptimizeGet(boolean optimizeGet) {
772 this.connector.setOptimizeGet(optimizeGet);
773 }
774
775
776
777
778
779
780
781
782 public final void setBufferAllocator(final BufferAllocator bufferAllocator) {
783 this.connector.setBufferAllocator(bufferAllocator);
784 }
785
786
787
788
789
790
791
792
793 public XMemcachedClient(final InetSocketAddress inetSocketAddress,
794 int weight) throws IOException {
795 super();
796 if (inetSocketAddress == null) {
797 throw new IllegalArgumentException("Null InetSocketAddress");
798
799 }
800 if (weight <= 0) {
801 throw new IllegalArgumentException("weight<=0");
802 }
803 this.buildConnector(new ArrayMemcachedSessionLocator(),
804 new SimpleBufferAllocator(),
805 XMemcachedClientBuilder.getDefaultConfiguration(),
806 XMemcachedClientBuilder.getDefaultSocketOptions(),
807 new TextCommandFactory(), new SerializingTranscoder());
808 this.start0();
809 this.connect(new InetSocketAddressWrapper(inetSocketAddress,
810 this.serverOrderCount.incrementAndGet(), weight, null));
811 }
812
813 public XMemcachedClient(final InetSocketAddress inetSocketAddress)
814 throws IOException {
815 this(inetSocketAddress, 1);
816 }
817
818 public XMemcachedClient() throws IOException {
819 super();
820 this.buildConnector(new ArrayMemcachedSessionLocator(),
821 new SimpleBufferAllocator(),
822 XMemcachedClientBuilder.getDefaultConfiguration(),
823 XMemcachedClientBuilder.getDefaultSocketOptions(),
824 new TextCommandFactory(), new SerializingTranscoder());
825 this.start0();
826 }
827
828
829
830
831
832
833
834
835
836
837
838
839
840 @SuppressWarnings("unchecked")
841 XMemcachedClient(MemcachedSessionLocator locator,
842 BufferAllocator allocator, Configuration conf,
843 Map<SocketOption, Object> socketOptions,
844 CommandFactory commandFactory, Transcoder transcoder,
845 Map<InetSocketAddress, InetSocketAddress> addressMap,
846 List<MemcachedClientStateListener> stateListeners,
847 Map<InetSocketAddress, AuthInfo> map, int poolSize,
848 long connectTimeout, String name, boolean failureMode)
849 throws IOException {
850 super();
851 this.setConnectTimeout(connectTimeout);
852 this.setFailureMode(failureMode);
853 this.setName(name);
854 this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0
855 : addressMap.size());
856 this.buildConnector(locator, allocator, conf, socketOptions,
857 commandFactory, transcoder);
858 if (stateListeners != null) {
859 for (MemcachedClientStateListener stateListener : stateListeners) {
860 this.addStateListener(stateListener);
861 }
862 }
863 this.setAuthInfoMap(map);
864 this.setConnectionPoolSize(poolSize);
865 this.start0();
866 if (addressMap != null) {
867 for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addressMap
868 .entrySet()) {
869 final InetSocketAddress mainNodeAddr = entry.getKey();
870 final InetSocketAddress standbyNodeAddr = entry.getValue();
871 this.connect(new InetSocketAddressWrapper(mainNodeAddr,
872 this.serverOrderCount.incrementAndGet(), 1, null));
873 if (standbyNodeAddr != null) {
874 this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
875 this.serverOrderCount.incrementAndGet(), 1,
876 mainNodeAddr));
877 }
878 }
879 }
880 }
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896 @SuppressWarnings("unchecked")
897 XMemcachedClient(MemcachedSessionLocator locator,
898 BufferAllocator allocator, Configuration conf,
899 Map<SocketOption, Object> socketOptions,
900 CommandFactory commandFactory, Transcoder transcoder,
901 Map<InetSocketAddress, InetSocketAddress> addressMap,
902 int[] weights, List<MemcachedClientStateListener> stateListeners,
903 Map<InetSocketAddress, AuthInfo> infoMap, int poolSize,
904 long connectTimeout, final String name, boolean failureMode)
905 throws IOException {
906 super();
907 this.setConnectTimeout(connectTimeout);
908 this.setFailureMode(failureMode);
909 this.setName(name);
910 if (weights == null && addressMap != null) {
911 throw new IllegalArgumentException("Null weights");
912 }
913 if (weights != null && addressMap == null) {
914 throw new IllegalArgumentException("Null addressList");
915 }
916
917 if (weights != null) {
918 for (int weight : weights) {
919 if (weight <= 0) {
920 throw new IllegalArgumentException("Some weights<=0");
921 }
922 }
923 }
924 if (weights != null && addressMap != null
925 && weights.length < addressMap.size()) {
926 throw new IllegalArgumentException(
927 "weights.length is less than addressList.size()");
928 }
929 this.optimiezeSetReadThreadCount(conf, addressMap == null ? 0
930 : addressMap.size());
931 this.buildConnector(locator, allocator, conf, socketOptions,
932 commandFactory, transcoder);
933 if (stateListeners != null) {
934 for (MemcachedClientStateListener stateListener : stateListeners) {
935 this.addStateListener(stateListener);
936 }
937 }
938 this.setAuthInfoMap(infoMap);
939 this.setConnectionPoolSize(poolSize);
940 this.start0();
941 if (addressMap != null && weights != null) {
942 int i = 0;
943 for (Map.Entry<InetSocketAddress, InetSocketAddress> entry : addressMap
944 .entrySet()) {
945 final InetSocketAddress mainNodeAddr = entry.getKey();
946 final InetSocketAddress standbyNodeAddr = entry.getValue();
947 this.connect(new InetSocketAddressWrapper(mainNodeAddr,
948 this.serverOrderCount.incrementAndGet(), weights[i],
949 null));
950 if (standbyNodeAddr != null) {
951 this.connect(new InetSocketAddressWrapper(standbyNodeAddr,
952 this.serverOrderCount.incrementAndGet(),
953 weights[i], mainNodeAddr));
954 }
955 i++;
956 }
957 }
958 }
959
960 private final void optimiezeSetReadThreadCount(Configuration conf,
961 int addressCount) {
962 if (conf != null && addressCount > 1) {
963 if (!this.isWindowsPlatform()
964 && conf.getReadThreadCount() == DEFAULT_READ_THREAD_COUNT) {
965 int threadCount = 2 * SystemUtils.getSystemThreadCount();
966 conf.setReadThreadCount(addressCount > threadCount ? threadCount
967 : addressCount);
968 }
969 }
970 }
971
972 private final boolean isWindowsPlatform() {
973 String osName = System.getProperty("os.name");
974 if (osName != null && osName.toLowerCase().indexOf("windows") >= 0) {
975 return true;
976 } else {
977 return false;
978 }
979 }
980
981
982
983
984
985
986
987 public XMemcachedClient(List<InetSocketAddress> addressList)
988 throws IOException {
989 super();
990 if (addressList == null || addressList.isEmpty()) {
991 throw new IllegalArgumentException("Empty address list");
992 }
993 BufferAllocator simpleBufferAllocator = new SimpleBufferAllocator();
994 this.buildConnector(new ArrayMemcachedSessionLocator(),
995 simpleBufferAllocator,
996 XMemcachedClientBuilder.getDefaultConfiguration(),
997 XMemcachedClientBuilder.getDefaultSocketOptions(),
998 new TextCommandFactory(), new SerializingTranscoder());
999 this.start0();
1000 for (InetSocketAddress inetSocketAddress : addressList) {
1001 this.connect(new InetSocketAddressWrapper(inetSocketAddress,
1002 this.serverOrderCount.incrementAndGet(), 1, null));
1003
1004 }
1005 }
1006
1007
1008
1009
1010
1011
1012
1013 @SuppressWarnings("unchecked")
1014 public final <T> T get(final String key, final long timeout,
1015 final Transcoder<T> transcoder) throws TimeoutException,
1016 InterruptedException, MemcachedException {
1017 return (T) this.get0(key, timeout, CommandType.GET_ONE, transcoder);
1018 }
1019
1020
1021
1022
1023
1024
1025 @SuppressWarnings("unchecked")
1026 public final <T> T get(final String key, final long timeout)
1027 throws TimeoutException, InterruptedException, MemcachedException {
1028 return (T) this.get(key, timeout, this.transcoder);
1029 }
1030
1031
1032
1033
1034
1035
1036
1037 public final <T> T get(final String key, final Transcoder<T> transcoder)
1038 throws TimeoutException, InterruptedException, MemcachedException {
1039 return this.get(key, this.opTimeout, transcoder);
1040 }
1041
1042
1043
1044
1045
1046
1047 @SuppressWarnings("unchecked")
1048 public final <T> T get(final String key) throws TimeoutException,
1049 InterruptedException, MemcachedException {
1050 return (T) this.get(key, this.opTimeout);
1051 }
1052
1053 private <T> Object get0(String key, final long timeout,
1054 final CommandType cmdType, final Transcoder<T> transcoder)
1055 throws TimeoutException, InterruptedException, MemcachedException {
1056 key = this.preProcessKey(key);
1057 byte[] keyBytes = ByteUtils.getBytes(key);
1058 ByteUtils.checkKey(keyBytes);
1059 return this.fetch0(key, keyBytes, cmdType, timeout, transcoder);
1060 }
1061
1062
1063
1064
1065
1066
1067
1068 @SuppressWarnings("unchecked")
1069 public final <T> GetsResponse<T> gets(final String key, final long timeout,
1070 final Transcoder<T> transcoder) throws TimeoutException,
1071 InterruptedException, MemcachedException {
1072 return (GetsResponse<T>) this.get0(key, timeout, CommandType.GETS_ONE,
1073 transcoder);
1074 }
1075
1076
1077
1078
1079
1080
1081 public final <T> GetsResponse<T> gets(final String key)
1082 throws TimeoutException, InterruptedException, MemcachedException {
1083 return this.gets(key, this.opTimeout);
1084 }
1085
1086
1087
1088
1089
1090
1091 @SuppressWarnings("unchecked")
1092 public final <T> GetsResponse<T> gets(final String key, final long timeout)
1093 throws TimeoutException, InterruptedException, MemcachedException {
1094 return this.gets(key, timeout, this.transcoder);
1095 }
1096
1097
1098
1099
1100
1101
1102
1103 @SuppressWarnings("unchecked")
1104 public final <T> GetsResponse<T> gets(final String key,
1105 final Transcoder transcoder) throws TimeoutException,
1106 InterruptedException, MemcachedException {
1107 return this.gets(key, this.opTimeout, transcoder);
1108 }
1109
1110
1111
1112
1113
1114
1115
1116 public final <T> Map<String, T> get(
1117 final Collection<String> keyCollections, final long timeout,
1118 final Transcoder<T> transcoder) throws TimeoutException,
1119 InterruptedException, MemcachedException {
1120 return this.getMulti0(keyCollections, timeout, CommandType.GET_MANY,
1121 transcoder);
1122 }
1123
1124
1125
1126
1127
1128
1129
1130 public final <T> Map<String, T> get(
1131 final Collection<String> keyCollections,
1132 final Transcoder<T> transcoder) throws TimeoutException,
1133 InterruptedException, MemcachedException {
1134 return this.getMulti0(keyCollections, this.opTimeout,
1135 CommandType.GET_MANY, transcoder);
1136 }
1137
1138
1139
1140
1141
1142
1143 public final <T> Map<String, T> get(final Collection<String> keyCollections)
1144 throws TimeoutException, InterruptedException, MemcachedException {
1145 return this.get(keyCollections, this.opTimeout);
1146 }
1147
1148
1149
1150
1151
1152
1153
1154 @SuppressWarnings("unchecked")
1155 public final <T> Map<String, T> get(
1156 final Collection<String> keyCollections, final long timeout)
1157 throws TimeoutException, InterruptedException, MemcachedException {
1158 return this.get(keyCollections, timeout, this.transcoder);
1159 }
1160
1161
1162
1163
1164
1165
1166
1167 @SuppressWarnings("unchecked")
1168 public final <T> Map<String, GetsResponse<T>> gets(
1169 final Collection<String> keyCollections, final long timeout,
1170 final Transcoder<T> transcoder) throws TimeoutException,
1171 InterruptedException, MemcachedException {
1172 return (Map<String, GetsResponse<T>>) this.getMulti0(keyCollections,
1173 timeout, CommandType.GETS_MANY, transcoder);
1174 }
1175
1176
1177
1178
1179
1180
1181 public final <T> Map<String, GetsResponse<T>> gets(
1182 final Collection<String> keyCollections) throws TimeoutException,
1183 InterruptedException, MemcachedException {
1184 return this.gets(keyCollections, this.opTimeout);
1185 }
1186
1187
1188
1189
1190
1191
1192
1193 @SuppressWarnings("unchecked")
1194 public final <T> Map<String, GetsResponse<T>> gets(
1195 final Collection<String> keyCollections, final long timeout)
1196 throws TimeoutException, InterruptedException, MemcachedException {
1197 return this.gets(keyCollections, timeout, this.transcoder);
1198 }
1199
1200
1201
1202
1203
1204
1205
1206 public final <T> Map<String, GetsResponse<T>> gets(
1207 final Collection<String> keyCollections,
1208 final Transcoder<T> transcoder) throws TimeoutException,
1209 InterruptedException, MemcachedException {
1210 return this.gets(keyCollections, this.opTimeout, transcoder);
1211 }
1212
1213 private final <T> Map<String, T> getMulti0(final Collection<String> keys,
1214 final long timeout, final CommandType cmdType,
1215 final Transcoder<T> transcoder) throws TimeoutException,
1216 InterruptedException, MemcachedException {
1217 if (keys == null || keys.size() == 0) {
1218 return null;
1219 }
1220 Collection<String> keyCollections = new ArrayList<String>(keys.size());
1221 for (String key : keys) {
1222 keyCollections.add(this.preProcessKey(key));
1223 }
1224 final CountDownLatch latch;
1225 final List<Command> commands;
1226 if (this.connector.getSessionSet().size() <= 1) {
1227 commands = new ArrayList<Command>(1);
1228 latch = new CountDownLatch(1);
1229 commands.add(this.sendGetMultiCommand(keyCollections, latch,
1230 cmdType, transcoder));
1231
1232 } else {
1233 Collection<List<String>> catalogKeys = this
1234 .catalogKeys(keyCollections);
1235 commands = new ArrayList<Command>(catalogKeys.size());
1236 latch = new CountDownLatch(catalogKeys.size());
1237 for (List<String> catalogKeyCollection : catalogKeys) {
1238 commands.add(this.sendGetMultiCommand(catalogKeyCollection,
1239 latch, cmdType, transcoder));
1240 }
1241 }
1242 if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
1243 for (Command getCmd : commands) {
1244 getCmd.cancel();
1245 }
1246 throw new TimeoutException("Timed out waiting for operation");
1247 }
1248 return this.reduceResult(cmdType, transcoder, commands);
1249 }
1250
1251 @SuppressWarnings("unchecked")
1252 private <T> Map<String, T> reduceResult(final CommandType cmdType,
1253 final Transcoder<T> transcoder, final List<Command> commands)
1254 throws MemcachedException,InterruptedException,TimeoutException {
1255 final Map<String, T> result = new HashMap<String, T>(commands.size());
1256 for (Command getCmd : commands) {
1257 getCmd.getIoBuffer().free();
1258 this.checkException(getCmd);
1259 Map<String, CachedData> map = (Map<String, CachedData>) getCmd
1260 .getResult();
1261 if (cmdType == CommandType.GET_MANY) {
1262 Iterator<Map.Entry<String, CachedData>> it = map.entrySet()
1263 .iterator();
1264 while (it.hasNext()) {
1265 Map.Entry<String, CachedData> entry = it.next();
1266 String decodeKey = this.decodeKey(entry.getKey());
1267 if (decodeKey != null) {
1268 result.put(decodeKey,
1269 transcoder.decode(entry.getValue()));
1270 }
1271 }
1272
1273 } else {
1274 Iterator<Map.Entry<String, CachedData>> it = map.entrySet()
1275 .iterator();
1276 while (it.hasNext()) {
1277 Map.Entry<String, CachedData> entry = it.next();
1278 GetsResponse getsResponse = new GetsResponse(entry
1279 .getValue().getCas(), transcoder.decode(entry
1280 .getValue()));
1281 String decodeKey = this.decodeKey(entry.getKey());
1282 if (decodeKey != null) {
1283 result.put(decodeKey, (T) getsResponse);
1284 }
1285 }
1286
1287 }
1288
1289 }
1290 return result;
1291 }
1292
1293
1294
1295
1296
1297
1298
1299 private final Collection<List<String>> catalogKeys(
1300 final Collection<String> keyCollections) {
1301 final Map<Session, List<String>> catalogMap = new HashMap<Session, List<String>>();
1302
1303 for (String key : keyCollections) {
1304 Session index = this.sessionLocator.getSessionByKey(key);
1305 if (!catalogMap.containsKey(index)) {
1306 List<String> tmpKeys = new ArrayList<String>(100);
1307 tmpKeys.add(key);
1308 catalogMap.put(index, tmpKeys);
1309 } else {
1310 catalogMap.get(index).add(key);
1311 }
1312 }
1313
1314 Collection<List<String>> catalogKeys = catalogMap.values();
1315 return catalogKeys;
1316 }
1317
1318 private final <T> Command sendGetMultiCommand(
1319 final Collection<String> keys, final CountDownLatch latch,
1320 final CommandType cmdType, final Transcoder<T> transcoder)
1321 throws InterruptedException, TimeoutException, MemcachedException {
1322 final Command command = this.commandFactory.createGetMultiCommand(keys,
1323 latch, cmdType, transcoder);
1324 this.sendCommand(command);
1325 return command;
1326 }
1327
1328
1329
1330
1331
1332
1333
1334 public final <T> boolean set(String key, final int exp, final T value,
1335 final Transcoder<T> transcoder, final long timeout)
1336 throws TimeoutException, InterruptedException, MemcachedException {
1337 key = this.preProcessKey(key);
1338 byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1339 return this.sendStoreCommand(this.commandFactory.createSetCommand(key,
1340 keyBytes, exp, value, false, transcoder), timeout);
1341 }
1342
1343 @SuppressWarnings("unchecked")
1344 public void setWithNoReply(String key, int exp, Object value)
1345 throws InterruptedException, MemcachedException {
1346 this.setWithNoReply(key, exp, value, this.transcoder);
1347 }
1348
1349 public <T> void setWithNoReply(String key, int exp, T value,
1350 Transcoder<T> transcoder) throws InterruptedException,
1351 MemcachedException {
1352 key = this.preProcessKey(key);
1353 byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1354 try {
1355 this.sendStoreCommand(this.commandFactory.createSetCommand(key,
1356 keyBytes, exp, value, true, transcoder), this.opTimeout);
1357 } catch (TimeoutException e) {
1358 throw new MemcachedException(e);
1359 }
1360 }
1361
1362 private final <T> byte[] checkStoreArguments(final String key,
1363 final int exp, final T value) {
1364 byte[] keyBytes = ByteUtils.getBytes(key);
1365 ByteUtils.checkKey(keyBytes);
1366 if (value == null) {
1367 throw new IllegalArgumentException("value could not be null");
1368 }
1369 if (exp < 0) {
1370 throw new IllegalArgumentException(
1371 "Expire time must be greater than or equal to 0");
1372 }
1373 return keyBytes;
1374 }
1375
1376
1377
1378
1379
1380
1381
1382 public final boolean set(final String key, final int exp, final Object value)
1383 throws TimeoutException, InterruptedException, MemcachedException {
1384 return this.set(key, exp, value, this.opTimeout);
1385 }
1386
1387
1388
1389
1390
1391
1392
1393 @SuppressWarnings("unchecked")
1394 public final boolean set(final String key, final int exp,
1395 final Object value, final long timeout) throws TimeoutException,
1396 InterruptedException, MemcachedException {
1397 return this.set(key, exp, value, this.transcoder, timeout);
1398 }
1399
1400
1401
1402
1403
1404
1405
1406 public final <T> boolean set(final String key, final int exp,
1407 final T value, final Transcoder<T> transcoder)
1408 throws TimeoutException, InterruptedException, MemcachedException {
1409 return this.set(key, exp, value, transcoder, this.opTimeout);
1410 }
1411
1412
1413
1414
1415
1416
1417
1418 public final <T> boolean add(String key, final int exp, final T value,
1419 final Transcoder<T> transcoder, final long timeout)
1420 throws TimeoutException, InterruptedException, MemcachedException {
1421 key = this.preProcessKey(key);
1422 return this.add0(key, exp, value, transcoder, timeout);
1423 }
1424
1425 private <T> boolean add0(String key, int exp, T value,
1426 Transcoder<T> transcoder, long timeout)
1427 throws InterruptedException, TimeoutException, MemcachedException {
1428 byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1429 return this.sendStoreCommand(this.commandFactory.createAddCommand(key,
1430 keyBytes, exp, value, false, transcoder), timeout);
1431 }
1432
1433
1434
1435
1436
1437
1438
1439 public final boolean add(final String key, final int exp, final Object value)
1440 throws TimeoutException, InterruptedException, MemcachedException {
1441 return this.add(key, exp, value, this.opTimeout);
1442 }
1443
1444
1445
1446
1447
1448
1449
1450 @SuppressWarnings("unchecked")
1451 public final boolean add(final String key, final int exp,
1452 final Object value, final long timeout) throws TimeoutException,
1453 InterruptedException, MemcachedException {
1454 return this.add(key, exp, value, this.transcoder, timeout);
1455 }
1456
1457
1458
1459
1460
1461
1462
1463 public final <T> boolean add(final String key, final int exp,
1464 final T value, final Transcoder<T> transcoder)
1465 throws TimeoutException, InterruptedException, MemcachedException {
1466 return this.add(key, exp, value, transcoder, this.opTimeout);
1467 }
1468
1469 @SuppressWarnings("unchecked")
1470 public void addWithNoReply(String key, int exp, Object value)
1471 throws InterruptedException, MemcachedException {
1472 this.addWithNoReply(key, exp, value, this.transcoder);
1473
1474 }
1475
1476 public <T> void addWithNoReply(String key, int exp, T value,
1477 Transcoder<T> transcoder) throws InterruptedException,
1478 MemcachedException {
1479 key = this.preProcessKey(key);
1480 byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1481 try {
1482 this.sendStoreCommand(this.commandFactory.createAddCommand(key,
1483 keyBytes, exp, value, true, transcoder), this.opTimeout);
1484 } catch (TimeoutException e) {
1485 throw new MemcachedException(e);
1486 }
1487
1488 }
1489
1490 @SuppressWarnings("unchecked")
1491 public void replaceWithNoReply(String key, int exp, Object value)
1492 throws InterruptedException, MemcachedException {
1493 this.replaceWithNoReply(key, exp, value, this.transcoder);
1494
1495 }
1496
1497 public <T> void replaceWithNoReply(String key, int exp, T value,
1498 Transcoder<T> transcoder) throws InterruptedException,
1499 MemcachedException {
1500 key = this.preProcessKey(key);
1501 byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1502 try {
1503 this.sendStoreCommand(this.commandFactory.createReplaceCommand(key,
1504 keyBytes, exp, value, true, transcoder), this.opTimeout);
1505 } catch (TimeoutException e) {
1506 throw new MemcachedException(e);
1507 }
1508
1509 }
1510
1511
1512
1513
1514
1515
1516
1517 public final <T> boolean replace(String key, final int exp, final T value,
1518 final Transcoder<T> transcoder, final long timeout)
1519 throws TimeoutException, InterruptedException, MemcachedException {
1520 key = this.preProcessKey(key);
1521 byte[] keyBytes = this.checkStoreArguments(key, exp, value);
1522 return this.sendStoreCommand(this.commandFactory.createReplaceCommand(
1523 key, keyBytes, exp, value, false, transcoder), timeout);
1524 }
1525
1526
1527
1528
1529
1530
1531
1532 public final boolean replace(final String key, final int exp,
1533 final Object value) throws TimeoutException, InterruptedException,
1534 MemcachedException {
1535 return this.replace(key, exp, value, this.opTimeout);
1536 }
1537
1538
1539
1540
1541
1542
1543
1544 @SuppressWarnings("unchecked")
1545 public final boolean replace(final String key, final int exp,
1546 final Object value, final long timeout) throws TimeoutException,
1547 InterruptedException, MemcachedException {
1548 return this.replace(key, exp, value, this.transcoder, timeout);
1549 }
1550
1551
1552
1553
1554
1555
1556
1557 public final <T> boolean replace(final String key, final int exp,
1558 final T value, final Transcoder<T> transcoder)
1559 throws TimeoutException, InterruptedException, MemcachedException {
1560 return this.replace(key, exp, value, transcoder, this.opTimeout);
1561 }
1562
1563
1564
1565
1566
1567
1568
1569 public final boolean append(final String key, final Object value)
1570 throws TimeoutException, InterruptedException, MemcachedException {
1571 return this.append(key, value, this.opTimeout);
1572 }
1573
1574
1575
1576
1577
1578
1579
1580 public final boolean append(String key, final Object value,
1581 final long timeout) throws TimeoutException, InterruptedException,
1582 MemcachedException {
1583 key = this.preProcessKey(key);
1584 byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1585 return this.sendStoreCommand(this.commandFactory.createAppendCommand(
1586 key, keyBytes, value, false, this.transcoder), timeout);
1587 }
1588
1589 public void appendWithNoReply(String key, Object value)
1590 throws InterruptedException, MemcachedException {
1591 key = this.preProcessKey(key);
1592 byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1593 try {
1594 this.sendStoreCommand(this.commandFactory.createAppendCommand(key,
1595 keyBytes, value, true, this.transcoder), this.opTimeout);
1596 } catch (TimeoutException e) {
1597 throw new MemcachedException(e);
1598 }
1599
1600 }
1601
1602
1603
1604
1605
1606
1607
1608 public final boolean prepend(final String key, final Object value)
1609 throws TimeoutException, InterruptedException, MemcachedException {
1610 return this.prepend(key, value, this.opTimeout);
1611 }
1612
1613
1614
1615
1616
1617
1618
1619 public final boolean prepend(String key, final Object value,
1620 final long timeout) throws TimeoutException, InterruptedException,
1621 MemcachedException {
1622 key = this.preProcessKey(key);
1623 byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1624 return this.sendStoreCommand(this.commandFactory.createPrependCommand(
1625 key, keyBytes, value, false, this.transcoder), timeout);
1626 }
1627
1628 public void prependWithNoReply(String key, Object value)
1629 throws InterruptedException, MemcachedException {
1630 key = this.preProcessKey(key);
1631 byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1632 try {
1633 this.sendStoreCommand(this.commandFactory.createPrependCommand(key,
1634 keyBytes, value, true, this.transcoder), this.opTimeout);
1635 } catch (TimeoutException e) {
1636 throw new MemcachedException(e);
1637 }
1638 }
1639
1640
1641
1642
1643
1644
1645
1646 public final boolean cas(final String key, final int exp,
1647 final Object value, final long cas) throws TimeoutException,
1648 InterruptedException, MemcachedException {
1649 return this.cas(key, exp, value, this.opTimeout, cas);
1650 }
1651
1652
1653
1654
1655
1656
1657
1658 public final <T> boolean cas(String key, final int exp, final T value,
1659 final Transcoder<T> transcoder, final long timeout, final long cas)
1660 throws TimeoutException, InterruptedException, MemcachedException {
1661 key = this.preProcessKey(key);
1662 byte[] keyBytes = this.checkStoreArguments(key, 0, value);
1663 return this.sendStoreCommand(this.commandFactory.createCASCommand(key,
1664 keyBytes, exp, value, cas, false, transcoder), timeout);
1665 }
1666
1667
1668
1669
1670
1671
1672
1673 @SuppressWarnings("unchecked")
1674 public final boolean cas(final String key, final int exp,
1675 final Object value, final long timeout, final long cas)
1676 throws TimeoutException, InterruptedException, MemcachedException {
1677 return this.cas(key, exp, value, this.transcoder, timeout, cas);
1678 }
1679
1680
1681
1682
1683
1684
1685
1686 public final <T> boolean cas(final String key, final int exp,
1687 final T value, final Transcoder<T> transcoder, final long cas)
1688 throws TimeoutException, InterruptedException, MemcachedException {
1689 return this.cas(key, exp, value, transcoder, this.opTimeout, cas);
1690 }
1691
1692 private final <T> boolean cas0(final String key, final int exp,
1693 GetsResponse<T> getsResponse, final CASOperation<T> operation,
1694 final Transcoder<T> transcoder, byte[] keyBytes, boolean noreply)
1695 throws TimeoutException, InterruptedException, MemcachedException {
1696 if (operation == null) {
1697 throw new IllegalArgumentException("CASOperation could not be null");
1698 }
1699 if (operation.getMaxTries() < 0) {
1700 throw new IllegalArgumentException(
1701 "max tries must be greater than 0");
1702 }
1703 int tryCount = 0;
1704 GetsResponse<T> result = getsResponse;
1705 if (result == null) {
1706 throw new NoValueException("Null GetsResponse for key=" + key);
1707 }
1708 while (tryCount <= operation.getMaxTries()
1709 && result != null
1710 && !this.sendStoreCommand(this.commandFactory.createCASCommand(
1711 key,
1712 keyBytes,
1713 exp,
1714 operation.getNewValue(result.getCas(),
1715 result.getValue()), result.getCas(), noreply,
1716 transcoder), this.opTimeout) && !noreply) {
1717 tryCount++;
1718 result = this.gets0(key, keyBytes, transcoder);
1719 if (result == null) {
1720 throw new NoValueException(
1721 "could not gets the value for Key=" + key + " for cas");
1722 }
1723 if (tryCount > operation.getMaxTries()) {
1724 throw new TimeoutException("CAS try times is greater than max");
1725 }
1726 }
1727 return true;
1728 }
1729
1730
1731
1732
1733
1734
1735
1736
1737 public final <T> boolean cas(String key, final int exp,
1738 final CASOperation<T> operation, final Transcoder<T> transcoder)
1739 throws TimeoutException, InterruptedException, MemcachedException {
1740 key = this.preProcessKey(key);
1741 byte[] keyBytes = ByteUtils.getBytes(key);
1742 ByteUtils.checkKey(keyBytes);
1743 GetsResponse<T> result = this.gets0(key, keyBytes, transcoder);
1744 return this.cas0(key, exp, result, operation, transcoder, keyBytes,
1745 false);
1746 }
1747
1748
1749
1750
1751
1752
1753
1754
1755 public final <T> boolean cas(String key, final int exp,
1756 GetsResponse<T> getsReponse, final CASOperation<T> operation,
1757 final Transcoder<T> transcoder) throws TimeoutException,
1758 InterruptedException, MemcachedException {
1759 key = this.preProcessKey(key);
1760 byte[] keyBytes = ByteUtils.getBytes(key);
1761 ByteUtils.checkKey(keyBytes);
1762 return this.cas0(key, exp, getsReponse, operation, transcoder,
1763 keyBytes, false);
1764 }
1765
1766
1767
1768
1769
1770
1771
1772 @SuppressWarnings("unchecked")
1773 public final <T> boolean cas(final String key, final int exp,
1774 GetsResponse<T> getsReponse, final CASOperation<T> operation)
1775 throws TimeoutException, InterruptedException, MemcachedException {
1776
1777 return this.cas(key, exp, getsReponse, operation, this.transcoder);
1778 }
1779
1780 public <T> void casWithNoReply(String key, CASOperation<T> operation)
1781 throws TimeoutException, InterruptedException, MemcachedException {
1782 this.casWithNoReply(key, 0, operation);
1783 }
1784
1785 public <T> void casWithNoReply(String key, GetsResponse<T> getsResponse,
1786 CASOperation<T> operation) throws TimeoutException,
1787 InterruptedException, MemcachedException {
1788 this.casWithNoReply(key, 0, getsResponse, operation);
1789
1790 }
1791
1792 @SuppressWarnings("unchecked")
1793 public <T> void casWithNoReply(String key, int exp,
1794 CASOperation<T> operation) throws TimeoutException,
1795 InterruptedException, MemcachedException {
1796 key = this.preProcessKey(key);
1797 byte[] keyBytes = ByteUtils.getBytes(key);
1798 GetsResponse<T> result = this.gets0(key, keyBytes, this.transcoder);
1799 this.casWithNoReply(key, exp, result, operation);
1800
1801 }
1802
1803 @SuppressWarnings("unchecked")
1804 public <T> void casWithNoReply(String key, int exp,
1805 GetsResponse<T> getsReponse, CASOperation<T> operation)
1806 throws TimeoutException, InterruptedException, MemcachedException {
1807 key = this.preProcessKey(key);
1808 byte[] keyBytes = ByteUtils.getBytes(key);
1809 ByteUtils.checkKey(keyBytes);
1810 this.cas0(key, exp, getsReponse, operation, this.transcoder, keyBytes,
1811 true);
1812
1813 }
1814
1815
1816
1817
1818
1819
1820
1821 public final <T> boolean cas(final String key, GetsResponse<T> getsReponse,
1822 final CASOperation<T> operation) throws TimeoutException,
1823 InterruptedException, MemcachedException {
1824 return this.cas(key, 0, getsReponse, operation);
1825 }
1826
1827
1828
1829
1830
1831
1832
1833 @SuppressWarnings("unchecked")
1834 public final <T> boolean cas(final String key, final int exp,
1835 final CASOperation<T> operation) throws TimeoutException,
1836 InterruptedException, MemcachedException {
1837 return this.cas(key, exp, operation, this.transcoder);
1838 }
1839
1840
1841
1842
1843
1844
1845
1846 public final <T> boolean cas(final String key,
1847 final CASOperation<T> operation) throws TimeoutException,
1848 InterruptedException, MemcachedException {
1849 return this.cas(key, 0, operation);
1850 }
1851
1852
1853
1854
1855
1856
1857 public final boolean delete(final String key, final int time)
1858 throws TimeoutException, InterruptedException, MemcachedException {
1859 return this.delete0(key, time, 0, false, this.opTimeout);
1860 }
1861
1862 public boolean delete(String key, long opTimeout) throws TimeoutException,
1863 InterruptedException, MemcachedException {
1864 return this.delete0(key, 0, 0, false, opTimeout);
1865 }
1866
1867 public boolean delete(String key, long cas, long opTimeout)
1868 throws TimeoutException, InterruptedException, MemcachedException {
1869 return this.delete0(key, 0, cas, false, opTimeout);
1870 }
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880 public final void deleteWithNoReply(final String key, final int time)
1881 throws InterruptedException, MemcachedException {
1882 try {
1883 this.delete0(key, time, 0, true, this.opTimeout);
1884 } catch (TimeoutException e) {
1885 throw new MemcachedException(e);
1886 }
1887 }
1888
1889 public final void deleteWithNoReply(final String key)
1890 throws InterruptedException, MemcachedException {
1891 this.deleteWithNoReply(key, 0);
1892 }
1893
1894 private boolean delete0(String key, final int time, long cas,
1895 boolean noreply, long opTimeout) throws MemcachedException,
1896 InterruptedException, TimeoutException {
1897 key = this.preProcessKey(key);
1898 final byte[] keyBytes = ByteUtils.getBytes(key);
1899 ByteUtils.checkKey(keyBytes);
1900 final Command command = this.commandFactory.createDeleteCommand(key,
1901 keyBytes, time, cas, noreply);
1902 final Session session = this.sendCommand(command);
1903 if (!command.isNoreply()) {
1904 this.latchWait(command, opTimeout, session);
1905 command.getIoBuffer().free();
1906 this.checkException(command);
1907 if (command.getResult() == null) {
1908 throw new MemcachedException(
1909 "Operation fail,may be caused by networking or timeout");
1910 }
1911 } else {
1912 return false;
1913 }
1914 return (Boolean) command.getResult();
1915 }
1916
1917 void checkException(final Command command) throws MemcachedException {
1918 if (command.getException() != null) {
1919 if (command.getException() instanceof MemcachedException) {
1920 throw (MemcachedException) command.getException();
1921 } else {
1922 throw new MemcachedException(command.getException());
1923 }
1924 }
1925 }
1926
1927 public boolean touch(String key, int exp, long opTimeout)
1928 throws TimeoutException, InterruptedException, MemcachedException {
1929 key = this.preProcessKey(key);
1930 final byte[] keyBytes = ByteUtils.getBytes(key);
1931 ByteUtils.checkKey(keyBytes);
1932 CountDownLatch latch = new CountDownLatch(1);
1933 final Command command = this.commandFactory.createTouchCommand(key,
1934 keyBytes, latch, exp, false);
1935 this.latchWait(command, opTimeout, this.sendCommand(command));
1936 command.getIoBuffer().free();
1937 this.checkException(command);
1938 if (command.getResult() == null) {
1939 throw new MemcachedException(
1940 "Operation fail,may be caused by networking or timeout");
1941 }
1942 return (Boolean) command.getResult();
1943 }
1944
1945 public boolean touch(String key, int exp) throws TimeoutException,
1946 InterruptedException, MemcachedException {
1947 return this.touch(key, exp, this.opTimeout);
1948 }
1949
1950 @SuppressWarnings("unchecked")
1951 public <T> T getAndTouch(String key, int newExp, long opTimeout)
1952 throws TimeoutException, InterruptedException, MemcachedException {
1953 key = this.preProcessKey(key);
1954 final byte[] keyBytes = ByteUtils.getBytes(key);
1955 ByteUtils.checkKey(keyBytes);
1956 CountDownLatch latch = new CountDownLatch(1);
1957 final Command command = this.commandFactory.createGetAndTouchCommand(
1958 key, keyBytes, latch, newExp, false);
1959 this.latchWait(command, opTimeout, this.sendCommand(command));
1960 command.getIoBuffer().free();
1961 this.checkException(command);
1962 CachedData data = (CachedData) command.getResult();
1963 if (data == null) {
1964 return null;
1965 }
1966 return (T) this.transcoder.decode(data);
1967 }
1968
1969 @SuppressWarnings("unchecked")
1970 public <T> T getAndTouch(String key, int newExp) throws TimeoutException,
1971 InterruptedException, MemcachedException {
1972 return (T) this.getAndTouch(key, newExp, this.opTimeout);
1973 }
1974
1975
1976
1977
1978
1979
1980 public final long incr(String key, final long delta)
1981 throws TimeoutException, InterruptedException, MemcachedException {
1982 key = this.preProcessKey(key);
1983 return this.sendIncrOrDecrCommand(key, delta, 0, CommandType.INCR,
1984 false, this.opTimeout, 0);
1985 }
1986
1987 public long incr(String key, long delta, long initValue)
1988 throws TimeoutException, InterruptedException, MemcachedException {
1989 key = this.preProcessKey(key);
1990 return this.sendIncrOrDecrCommand(key, delta, initValue,
1991 CommandType.INCR, false, this.opTimeout, 0);
1992 }
1993
1994 public long incr(String key, long delta, long initValue, long timeout)
1995 throws TimeoutException, InterruptedException, MemcachedException {
1996 key = this.preProcessKey(key);
1997 return this.sendIncrOrDecrCommand(key, delta, initValue,
1998 CommandType.INCR, false, timeout, 0);
1999 }
2000
2001 public long incr(String key, long delta, long initValue, long timeout,
2002 int exp) throws TimeoutException, InterruptedException,
2003 MemcachedException {
2004 key = this.preProcessKey(key);
2005 return this.sendIncrOrDecrCommand(key, delta, initValue,
2006 CommandType.INCR, false, timeout, exp);
2007 }
2008
2009 public final void incrWithNoReply(String key, long delta)
2010 throws InterruptedException, MemcachedException {
2011 key = this.preProcessKey(key);
2012 try {
2013 this.sendIncrOrDecrCommand(key, delta, 0, CommandType.INCR, true,
2014 this.opTimeout, 0);
2015 } catch (TimeoutException e) {
2016 throw new MemcachedException(e);
2017 }
2018 }
2019
2020 public final void decrWithNoReply(String key, final long delta)
2021 throws InterruptedException, MemcachedException {
2022 key = this.preProcessKey(key);
2023 try {
2024 this.sendIncrOrDecrCommand(key, delta, 0, CommandType.DECR, true,
2025 this.opTimeout, 0);
2026 } catch (TimeoutException e) {
2027 throw new MemcachedException(e);
2028 }
2029 }
2030
2031
2032
2033
2034
2035
2036 public final long decr(String key, final long delta)
2037 throws TimeoutException, InterruptedException, MemcachedException {
2038 key = this.preProcessKey(key);
2039 return this.sendIncrOrDecrCommand(key, delta, 0, CommandType.DECR,
2040 false, this.opTimeout, 0);
2041 }
2042
2043 public long decr(String key, long delta, long initValue)
2044 throws TimeoutException, InterruptedException, MemcachedException {
2045 key = this.preProcessKey(key);
2046 return this.sendIncrOrDecrCommand(key, delta, initValue,
2047 CommandType.DECR, false, this.opTimeout, 0);
2048 }
2049
2050 public long decr(String key, long delta, long initValue, long timeout)
2051 throws TimeoutException, InterruptedException, MemcachedException {
2052 key = this.preProcessKey(key);
2053 return this.sendIncrOrDecrCommand(key, delta, initValue,
2054 CommandType.DECR, false, timeout, 0);
2055 }
2056
2057 public long decr(String key, long delta, long initValue, long timeout,
2058 int exp) throws TimeoutException, InterruptedException,
2059 MemcachedException {
2060 key = this.preProcessKey(key);
2061 return this.sendIncrOrDecrCommand(key, delta, initValue,
2062 CommandType.DECR, false, timeout, exp);
2063 }
2064
2065
2066
2067
2068
2069
2070 public final void flushAll() throws TimeoutException, InterruptedException,
2071 MemcachedException {
2072 this.flushAll(this.opTimeout);
2073 }
2074
2075 public void flushAllWithNoReply() throws InterruptedException,
2076 MemcachedException {
2077 try {
2078 this.flushAllMemcachedServers(this.opTimeout, true, 0);
2079 } catch (TimeoutException e) {
2080 throw new MemcachedException(e);
2081 }
2082 }
2083
2084 public void flushAllWithNoReply(int exptime) throws InterruptedException,
2085 MemcachedException {
2086 try {
2087 this.flushAllMemcachedServers(this.opTimeout, true, exptime);
2088 } catch (TimeoutException e) {
2089 throw new MemcachedException(e);
2090 }
2091 }
2092
2093 public void flushAllWithNoReply(InetSocketAddress address)
2094 throws MemcachedException, InterruptedException {
2095 try {
2096 this.flushSpecialMemcachedServer(address, this.opTimeout, true, 0);
2097 } catch (TimeoutException e) {
2098 throw new MemcachedException(e);
2099 }
2100 }
2101
2102 public void flushAllWithNoReply(InetSocketAddress address, int exptime)
2103 throws MemcachedException, InterruptedException {
2104 try {
2105 this.flushSpecialMemcachedServer(address, this.opTimeout, true,
2106 exptime);
2107 } catch (TimeoutException e) {
2108 throw new MemcachedException(e);
2109 }
2110 }
2111
2112 public final void flushAll(int exptime, long timeout)
2113 throws TimeoutException, InterruptedException, MemcachedException {
2114 this.flushAllMemcachedServers(timeout, false, exptime);
2115 }
2116
2117
2118
2119
2120
2121
2122 public final void flushAll(long timeout) throws TimeoutException,
2123 InterruptedException, MemcachedException {
2124 this.flushAllMemcachedServers(timeout, false, 0);
2125 }
2126
2127 private void flushAllMemcachedServers(long timeout, boolean noreply,
2128 int exptime) throws MemcachedException, InterruptedException,
2129 TimeoutException {
2130 final Collection<Session> sessions = this.connector.getSessionSet();
2131 CountDownLatch latch = new CountDownLatch(sessions.size());
2132 List<Command> commands = new ArrayList<Command>(sessions.size());
2133 for (Session session : sessions) {
2134 if (session != null && !session.isClosed()) {
2135 Command command = this.commandFactory.createFlushAllCommand(
2136 latch, exptime, noreply);
2137
2138 session.write(command);
2139 } else {
2140 latch.countDown();
2141 }
2142 }
2143 if (!noreply) {
2144 if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
2145 for (Command cmd : commands) {
2146 cmd.cancel();
2147 }
2148 throw new TimeoutException("Timed out waiting for operation");
2149 }
2150 }
2151 }
2152
2153 public void setLoggingLevelVerbosity(InetSocketAddress address, int level)
2154 throws TimeoutException, InterruptedException, MemcachedException {
2155 this.setMemcachedLoggingLevel(address, level, false);
2156
2157 }
2158
2159 private void setMemcachedLoggingLevel(InetSocketAddress address, int level,
2160 boolean noreply) throws MemcachedException, InterruptedException,
2161 TimeoutException {
2162 if (address == null) {
2163 throw new IllegalArgumentException("Null adderss");
2164 }
2165 CountDownLatch latch = new CountDownLatch(1);
2166
2167 Queue<Session> sessionQueue = this.connector
2168 .getSessionByAddress(address);
2169 if (sessionQueue == null || sessionQueue.peek() == null) {
2170 throw new MemcachedException("could not find session for "
2171 + SystemUtils.getRawAddress(address) + ":"
2172 + address.getPort() + ",maybe it have not been connected");
2173 }
2174
2175 Command command = this.commandFactory.createVerbosityCommand(latch,
2176 level, noreply);
2177 final Session session = sessionQueue.peek();
2178 session.write(command);
2179 if (!noreply) {
2180 this.latchWait(command, this.opTimeout, session);
2181 }
2182 }
2183
2184 public void setLoggingLevelVerbosityWithNoReply(InetSocketAddress address,
2185 int level) throws InterruptedException, MemcachedException {
2186 try {
2187 this.setMemcachedLoggingLevel(address, level, true);
2188 } catch (TimeoutException e) {
2189 throw new MemcachedException(e);
2190 }
2191
2192 }
2193
2194
2195
2196
2197
2198
2199
2200
2201 public final void flushAll(InetSocketAddress address)
2202 throws MemcachedException, InterruptedException, TimeoutException {
2203 this.flushAll(address, this.opTimeout);
2204 }
2205
2206
2207
2208
2209
2210
2211
2212
2213 public final void flushAll(InetSocketAddress address, long timeout)
2214 throws MemcachedException, InterruptedException, TimeoutException {
2215 this.flushSpecialMemcachedServer(address, timeout, false, 0);
2216 }
2217
2218 public final void flushAll(InetSocketAddress address, long timeout,
2219 int exptime) throws MemcachedException, InterruptedException,
2220 TimeoutException {
2221 this.flushSpecialMemcachedServer(address, timeout, false, exptime);
2222 }
2223
2224 private void flushSpecialMemcachedServer(InetSocketAddress address,
2225 long timeout, boolean noreply, int exptime)
2226 throws MemcachedException, InterruptedException, TimeoutException {
2227 if (address == null) {
2228 throw new IllegalArgumentException("Null adderss");
2229 }
2230 CountDownLatch latch = new CountDownLatch(1);
2231
2232 Queue<Session> sessionQueue = this.connector
2233 .getSessionByAddress(address);
2234 if (sessionQueue == null || sessionQueue.peek() == null) {
2235 throw new MemcachedException("could not find session for "
2236 + SystemUtils.getRawAddress(address) + ":"
2237 + address.getPort() + ",maybe it have not been connected");
2238 }
2239 Command command = this.commandFactory.createFlushAllCommand(latch,
2240 exptime, noreply);
2241 final Session session = sessionQueue.peek();
2242 session.write(command);
2243 if (!noreply) {
2244 this.latchWait(command, timeout, session);
2245 }
2246 }
2247
2248
2249
2250
2251
2252
2253 public final void flushAll(String host) throws TimeoutException,
2254 InterruptedException, MemcachedException {
2255 this.flushAll(AddrUtil.getOneAddress(host), this.opTimeout);
2256 }
2257
2258
2259
2260
2261
2262
2263
2264 public final Map<String, String> stats(InetSocketAddress address)
2265 throws MemcachedException, InterruptedException, TimeoutException {
2266 return this.stats(address, this.opTimeout);
2267 }
2268
2269
2270
2271
2272
2273
2274
2275
2276 @SuppressWarnings("unchecked")
2277 public final Map<String, String> stats(InetSocketAddress address,
2278 long timeout) throws MemcachedException, InterruptedException,
2279 TimeoutException {
2280 if (address == null) {
2281 throw new IllegalArgumentException("Null inetSocketAddress");
2282 }
2283 CountDownLatch latch = new CountDownLatch(1);
2284
2285 Queue<Session> sessionQueue = this.connector
2286 .getSessionByAddress(address);
2287 if (sessionQueue == null || sessionQueue.peek() == null) {
2288 throw new MemcachedException("could not find session for "
2289 + SystemUtils.getRawAddress(address) + ":"
2290 + address.getPort() + ",maybe it have not been connected");
2291 }
2292 Command command = this.commandFactory.createStatsCommand(address,
2293 latch, null);
2294 final Session session = sessionQueue.peek();
2295 session.write(command);
2296 this.latchWait(command, timeout, session);
2297 return (Map<String, String>) command.getResult();
2298 }
2299
2300 public final Map<InetSocketAddress, Map<String, String>> getStats()
2301 throws MemcachedException, InterruptedException, TimeoutException {
2302 return this.getStats(this.opTimeout);
2303 }
2304
2305 public final Map<InetSocketAddress, Map<String, String>> getStatsByItem(
2306 String itemName) throws MemcachedException, InterruptedException,
2307 TimeoutException {
2308 return this.getStatsByItem(itemName, this.opTimeout);
2309 }
2310
2311 @SuppressWarnings("unchecked")
2312 public final Map<InetSocketAddress, Map<String, String>> getStatsByItem(
2313 String itemName, long timeout) throws MemcachedException,
2314 InterruptedException, TimeoutException {
2315 final Set<Session> sessionSet = this.connector.getSessionSet();
2316 final Map<InetSocketAddress, Map<String, String>> collectResult = new HashMap<InetSocketAddress, Map<String, String>>();
2317 if (sessionSet.size() == 0) {
2318 return collectResult;
2319 }
2320 final CountDownLatch latch = new CountDownLatch(sessionSet.size());
2321 List<Command> commands = new ArrayList<Command>(sessionSet.size());
2322 for (Session session : sessionSet) {
2323 Command command = this.commandFactory.createStatsCommand(
2324 session.getRemoteSocketAddress(), latch, itemName);
2325
2326 session.write(command);
2327 commands.add(command);
2328
2329 }
2330 if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
2331 for (Command command : commands) {
2332 command.cancel();
2333 }
2334 throw new TimeoutException("Timed out waiting for operation");
2335 }
2336 for (Command command : commands) {
2337 this.checkException(command);
2338 collectResult.put(((ServerAddressAware) command).getServer(),
2339 (Map<String, String>) command.getResult());
2340 }
2341 return collectResult;
2342 }
2343
2344 public final Map<InetSocketAddress, String> getVersions()
2345 throws TimeoutException, InterruptedException, MemcachedException {
2346 return this.getVersions(this.opTimeout);
2347 }
2348
2349 public final Map<InetSocketAddress, String> getVersions(long timeout)
2350 throws TimeoutException, InterruptedException, MemcachedException {
2351 final Set<Session> sessionSet = this.connector.getSessionSet();
2352 Map<InetSocketAddress, String> collectResult = new HashMap<InetSocketAddress, String>();
2353 if (sessionSet.size() == 0) {
2354 return collectResult;
2355 }
2356 final CountDownLatch latch = new CountDownLatch(sessionSet.size());
2357 List<Command> commands = new ArrayList<Command>(sessionSet.size());
2358 for (Session session : sessionSet) {
2359 Command command = this.commandFactory.createVersionCommand(latch,
2360 session.getRemoteSocketAddress());
2361 session.write(command);
2362 commands.add(command);
2363
2364 }
2365
2366 if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
2367 for (Command command : commands) {
2368 command.cancel();
2369 }
2370 throw new TimeoutException("Timed out waiting for operation");
2371 }
2372 for (Command command : commands) {
2373 this.checkException(command);
2374 collectResult.put(((ServerAddressAware) command).getServer(),
2375 (String) command.getResult());
2376 }
2377 return collectResult;
2378 }
2379
2380 public Map<InetSocketAddress, Map<String, String>> getStats(long timeout)
2381 throws MemcachedException, InterruptedException, TimeoutException {
2382 return this.getStatsByItem(null, timeout);
2383 }
2384
2385
2386
2387
2388
2389
2390 public final void shutdown() throws IOException {
2391 if (this.shutdown) {
2392 return;
2393 }
2394 this.shutdown = true;
2395 this.connector.quitAllSessions();
2396 this.connector.stop();
2397 this.memcachedHandler.stop();
2398 XMemcachedMbeanServer.getInstance().shutdown();
2399 if (!this.isHutdownHookCalled) {
2400 try {
2401 Runtime.getRuntime()
2402 .removeShutdownHook(this.shutdownHookThread);
2403 } catch (Exception e) {
2404
2405 }
2406 }
2407 }
2408
2409 private long sendIncrOrDecrCommand(final String key, final long delta,
2410 long initValue, final CommandType cmdType, boolean noreply,
2411 long operationTimeout, int exp) throws InterruptedException,
2412 TimeoutException, MemcachedException {
2413 final byte[] keyBytes = ByteUtils.getBytes(key);
2414 ByteUtils.checkKey(keyBytes);
2415 final Command command = this.commandFactory.createIncrDecrCommand(key,
2416 keyBytes, delta, initValue, exp, cmdType, noreply);
2417 final Session session = this.sendCommand(command);
2418 if (!command.isNoreply()) {
2419 this.latchWait(command, operationTimeout, session);
2420 command.getIoBuffer().free();
2421 this.checkException(command);
2422 if (command.getResult() == null) {
2423 throw new MemcachedException(
2424 "Operation fail,may be caused by networking or timeout");
2425 }
2426 final Object result = command.getResult();
2427 if (result instanceof String) {
2428 if (((String) result).equals("NOT_FOUND")) {
2429 if (this.add0(key, exp, String.valueOf(initValue),
2430 this.transcoder, this.opTimeout)) {
2431 return initValue;
2432 } else {
2433 return this.sendIncrOrDecrCommand(key, delta,
2434 initValue, cmdType, noreply, operationTimeout,
2435 exp);
2436 }
2437 } else {
2438 throw new MemcachedException(
2439 "Unknown result type for incr/decr:"
2440 + result.getClass() + ",result=" + result);
2441 }
2442 } else {
2443 return (Long) command.getResult();
2444 }
2445 } else {
2446 return -1;
2447 }
2448 }
2449
2450 public void setConnectionPoolSize(int poolSize) {
2451 if (!this.shutdown && this.getAvaliableServers().size() > 0) {
2452 throw new IllegalStateException(
2453 "Xmemcached client has been started");
2454 }
2455 if (poolSize <= 0) {
2456 throw new IllegalArgumentException("poolSize<=0");
2457 }
2458 this.connectionPoolSize = poolSize;
2459 this.connector.setConnectionPoolSize(poolSize);
2460 }
2461
2462
2463
2464
2465
2466
2467 public final boolean delete(final String key) throws TimeoutException,
2468 InterruptedException, MemcachedException {
2469 return this.delete(key, 0);
2470 }
2471
2472
2473
2474
2475
2476
2477 @SuppressWarnings("unchecked")
2478 public final Transcoder getTranscoder() {
2479 return this.transcoder;
2480 }
2481
2482
2483
2484
2485
2486
2487
2488
2489 @SuppressWarnings("unchecked")
2490 public final void setTranscoder(final Transcoder transcoder) {
2491 this.transcoder = transcoder;
2492 }
2493
2494 private final <T> boolean sendStoreCommand(Command command, long timeout)
2495 throws InterruptedException, TimeoutException, MemcachedException {
2496
2497 final Session session = this.sendCommand(command);
2498 if (!command.isNoreply()) {
2499 this.latchWait(command, timeout, session);
2500 command.getIoBuffer().free();
2501 this.checkException(command);
2502 if (command.getResult() == null) {
2503 throw new MemcachedException(
2504 "Operation fail,may be caused by networking or timeout");
2505 }
2506 } else {
2507 return false;
2508 }
2509 return (Boolean) command.getResult();
2510 }
2511
2512 private static final String CONTINUOUS_TIMEOUT_COUNTER = "ContinuousTimeouts";
2513
2514 private void latchWait(final Command cmd, final long timeout,
2515 final Session session) throws InterruptedException,
2516 TimeoutException {
2517 if (cmd.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
2518 AtomicInteger counter = this.getContinuousTimeoutCounter(session);
2519
2520 if (counter.get() > 0) {
2521 counter.set(0);
2522 }
2523 } else {
2524 cmd.cancel();
2525 AtomicInteger counter = this.getContinuousTimeoutCounter(session);
2526 if (counter.incrementAndGet() > this.timeoutExceptionThreshold) {
2527 log.warn(session
2528 + " exceeded continuous timeout threshold,we will close it.");
2529 try {
2530
2531 counter.set(0);
2532 session.close();
2533 } catch (Exception e) {
2534
2535 }
2536 }
2537 throw new TimeoutException(
2538 "Timed out("
2539 + timeout
2540 + " milliseconds) waiting for operation while connected to "
2541 + session);
2542 }
2543 }
2544
2545 private AtomicInteger getContinuousTimeoutCounter(final Session session) {
2546 AtomicInteger counter = (AtomicInteger) session
2547 .getAttribute(CONTINUOUS_TIMEOUT_COUNTER);
2548 if (counter == null) {
2549 counter = new AtomicInteger(0);
2550 AtomicInteger oldCounter = (AtomicInteger) session
2551 .setAttributeIfAbsent(CONTINUOUS_TIMEOUT_COUNTER, counter);
2552 if (oldCounter != null) {
2553 counter = oldCounter;
2554 }
2555 }
2556 return counter;
2557 }
2558
2559
2560
2561
2562
2563
2564
2565 @Deprecated
2566 public final Collection<InetSocketAddress> getAvaliableServers() {
2567 return this.getAvailableServers();
2568 }
2569
2570 public Collection<InetSocketAddress> getAvailableServers() {
2571 Set<Session> sessionSet = this.connector.getSessionSet();
2572 Set<InetSocketAddress> result = new HashSet<InetSocketAddress>();
2573 for (Session session : sessionSet) {
2574 result.add(session.getRemoteSocketAddress());
2575 }
2576 return Collections.unmodifiableSet(result);
2577 }
2578
2579 public final int getConnectionSizeBySocketAddress(InetSocketAddress address) {
2580 Queue<Session> sessionList = this.connector
2581 .getSessionByAddress(address);
2582 return sessionList == null ? 0 : sessionList.size();
2583 }
2584
2585 public void addStateListener(MemcachedClientStateListener listener) {
2586 MemcachedClientStateListenerAdapter adapter = new MemcachedClientStateListenerAdapter(
2587 listener, this);
2588 this.stateListenerAdapters.add(adapter);
2589 this.connector.addStateListener(adapter);
2590 }
2591
2592 public Collection<MemcachedClientStateListener> getStateListeners() {
2593 final List<MemcachedClientStateListener> result = new ArrayList<MemcachedClientStateListener>(
2594 this.stateListenerAdapters.size());
2595 for (MemcachedClientStateListenerAdapter adapter : this.stateListenerAdapters) {
2596 result.add(adapter.getMemcachedClientStateListener());
2597 }
2598 return result;
2599 }
2600
2601 public void setPrimitiveAsString(boolean primitiveAsString) {
2602 this.transcoder.setPrimitiveAsString(primitiveAsString);
2603 }
2604
2605 public void removeStateListener(MemcachedClientStateListener listener) {
2606 for (MemcachedClientStateListenerAdapter adapter : this.stateListenerAdapters) {
2607 if (adapter.getMemcachedClientStateListener().equals(listener)) {
2608 this.stateListenerAdapters.remove(adapter);
2609 this.connector.removeStateListener(adapter);
2610 }
2611 }
2612 }
2613
2614 public Protocol getProtocol() {
2615 return this.commandFactory.getProtocol();
2616 }
2617
2618 public boolean isSanitizeKeys() {
2619 return this.sanitizeKeys;
2620 }
2621
2622 public void setSanitizeKeys(boolean sanitizeKeys) {
2623 this.sanitizeKeys = sanitizeKeys;
2624 }
2625
2626 private String decodeKey(String key) throws MemcachedException,
2627 InterruptedException, TimeoutException {
2628 try {
2629 key = this.sanitizeKeys ? URLDecoder.decode(key, "UTF-8") : key;
2630 } catch (UnsupportedEncodingException e) {
2631 throw new MemcachedException(
2632 "Unsupport encoding utf-8 when decodeKey", e);
2633 }
2634 String ns = NAMESPACE_LOCAL.get();
2635 if (ns != null && ns.trim().length() > 0) {
2636 String nsValue = this.getNamespace(ns);
2637 try {
2638 if (nsValue != null && key.startsWith(nsValue)) {
2639
2640 key = key.substring(nsValue.length() + 1);
2641 } else {
2642 return null;
2643 }
2644 } catch (Exception e) {
2645 throw new MemcachedException(
2646 "Exception occured when decode key.", e);
2647 }
2648 }
2649 return key;
2650 }
2651
2652 private String preProcessKey(String key) throws MemcachedException,
2653 InterruptedException {
2654 key = this.keyProvider.process(key);
2655 try {
2656 key = this.sanitizeKeys ? URLEncoder.encode(key, "UTF-8") : key;
2657 } catch (UnsupportedEncodingException e) {
2658 throw new MemcachedException(
2659 "Unsupport encoding utf-8 when sanitize key", e);
2660 }
2661 String ns = NAMESPACE_LOCAL.get();
2662 if (ns != null && ns.trim().length() > 0) {
2663 try {
2664 key = this.getNamespace(ns) + ":" + key;
2665 } catch (TimeoutException e) {
2666 throw new MemcachedException(
2667 "Timeout occured when gettting namespace value.", e);
2668 }
2669 }
2670 return key;
2671 }
2672
2673 public void invalidateNamespace(String ns, long opTimeout)
2674 throws MemcachedException, InterruptedException, TimeoutException {
2675 String key = this.getNSKey(ns);
2676 this.incr(key, 1, System.currentTimeMillis(), opTimeout);
2677 }
2678
2679 public void invalidateNamespace(String ns) throws MemcachedException,
2680 InterruptedException, TimeoutException {
2681 this.invalidateNamespace(ns, this.opTimeout);
2682 }
2683
2684
2685
2686
2687
2688
2689
2690
2691
2692
2693 public String getNamespace(String ns) throws TimeoutException,
2694 InterruptedException, MemcachedException {
2695 String key = this.keyProvider.process(this.getNSKey(ns));
2696 byte[] keyBytes = ByteUtils.getBytes(key);
2697 ByteUtils.checkKey(keyBytes);
2698 Object item = this.fetch0(key, keyBytes, CommandType.GET_ONE,
2699 this.opTimeout, this.transcoder);
2700 while (item == null) {
2701 item = String.valueOf(System.nanoTime());
2702 boolean added = this.add0(key, 0, item, this.transcoder,
2703 this.opTimeout);
2704 if (!added) {
2705 item = this.fetch0(key, keyBytes, CommandType.GET_ONE,
2706 this.opTimeout, this.transcoder);
2707 }
2708 }
2709 String namespace = item.toString();
2710 if (!ByteUtils.isNumber(namespace)) {
2711 throw new IllegalStateException(
2712 "Namespace key already has value.The key is:" + key
2713 + ",and the value is:" + namespace);
2714 }
2715 return namespace;
2716 }
2717
2718 private String getNSKey(String ns) {
2719 String key = "namespace:" + ns;
2720 return key;
2721 }
2722
2723 public Counter getCounter(String key, long initialValue) {
2724 return new Counter(this, key, initialValue);
2725 }
2726
2727 public Counter getCounter(String key) {
2728 return new Counter(this, key, 0);
2729 }
2730
2731
2732
2733
2734
2735 @Deprecated
2736 @SuppressWarnings("unchecked")
2737 public KeyIterator getKeyIterator(InetSocketAddress address)
2738 throws MemcachedException, TimeoutException, InterruptedException {
2739 if (address == null) {
2740 throw new IllegalArgumentException("null address");
2741 }
2742 Queue<Session> sessions = this.connector.getSessionByAddress(address);
2743 if (sessions == null || sessions.size() == 0) {
2744 throw new MemcachedException(
2745 "The special memcached server has not been connected,"
2746 + address);
2747 }
2748 Session session = sessions.peek();
2749 CountDownLatch latch = new CountDownLatch(1);
2750 Command command = this.commandFactory.createStatsCommand(
2751 session.getRemoteSocketAddress(), latch, "items");
2752 session.write(command);
2753 if (!latch.await(5000, TimeUnit.MILLISECONDS)) {
2754 throw new TimeoutException("Operation timeout");
2755 }
2756 if (command.getException() != null) {
2757 if (command.getException() instanceof MemcachedException) {
2758 throw (MemcachedException) command.getException();
2759 } else {
2760 throw new MemcachedException("stats items failed",
2761 command.getException());
2762 }
2763 }
2764 Map<String, String> result = (Map<String, String>) command.getResult();
2765 LinkedList<Integer> itemNumberList = new LinkedList<Integer>();
2766 for (Map.Entry<String, String> entry : result.entrySet()) {
2767 final String key = entry.getKey();
2768 final String[] keys = key.split(":");
2769 if (keys.length == 3 && keys[2].equals("number")
2770 && keys[0].equals("items")) {
2771
2772 if (Integer.parseInt(entry.getValue()) > 0) {
2773 itemNumberList.add(Integer.parseInt(keys[1]));
2774 }
2775 }
2776 }
2777 return new KeyIteratorImpl(itemNumberList, this, address);
2778 }
2779
2780 public void setEnableHealSession(boolean enableHealSession) {
2781 if (this.connector != null) {
2782 this.connector.setEnableHealSession(enableHealSession);
2783 } else {
2784 throw new IllegalStateException("The client has not been started.");
2785 }
2786 }
2787
2788 public void setFailureMode(boolean failureMode) {
2789 this.failureMode = failureMode;
2790 if (this.sessionLocator != null) {
2791 this.sessionLocator.setFailureMode(failureMode);
2792 }
2793 if (this.connector != null) {
2794 this.connector.setFailureMode(failureMode);
2795 }
2796 }
2797
2798 public boolean isFailureMode() {
2799 return this.failureMode;
2800 }
2801
2802 public Queue<ReconnectRequest> getReconnectRequestQueue() {
2803 return this.connector != null ? this.connector
2804 .getReconnectRequestQueue() : null;
2805 }
2806
2807 }