问题

在生产环境中,突然发现RabbitMQ中的某条队列的数据没有被消费掉,而是一直停留在那里,并到好几个小时了,消息一直在Unacknowledged中.查看Tomcat的日志,也没有发现报异常,只是日志的log一直停留在那里,没有任何滚动。

查找原因

然后到生产环境,执行jstack -l TOMCAT_PID > /tmp/TOMCAT_PID.stack.log想找出此时的进程内的线程栈结构。发现又报如下错误:

jstack error attaching to core file can't attach to the core file

这个问题,可以参考我的另一篇blog: 使用Java监控工具出现 Can’t attach to the process

然后找到对应的队列,所在的listener调用的方法,然后在/tmp/TOMCAT_PID.stack.log文件里搜索到相关的栈内容.比如我的:

"SimpleAsyncTaskExecutor-1" prio=10 tid=0x00007f5984cca000 nid=0x399 runnable [0x00007f59c1cae000]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:150)
        at java.net.SocketInputStream.read(SocketInputStream.java:121)
        at sun.security.ssl.InputRecord.readFully(InputRecord.java:312)
        at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:424)
        at sun.security.ssl.InputRecord.read(InputRecord.java:379)
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
        - locked <0x0000000784e3e858> (a java.lang.Object)
        at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
        at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
        - locked <0x0000000784e3d9f0> (a sun.security.ssl.AppInputStream)
        at okio.Okio$2.read(Okio.java:136)
        at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
        at okio.RealBufferedSource.read(RealBufferedSource.java:50)
        at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:457)
        at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
        at okio.InflaterSource.refill(InflaterSource.java:96)
        at okio.InflaterSource.read(InflaterSource.java:62)
        at okio.GzipSource.read(GzipSource.java:80)
        at okio.Buffer.writeAll(Buffer.java:574)
        at okio.RealBufferedSource.readByteArray(RealBufferedSource.java:87)
        at com.squareup.okhttp.ResponseBody.bytes(ResponseBody.java:56)
        at com.squareup.okhttp.ResponseBody.string(ResponseBody.java:82)

这条线程就是执行该listener的方法的,发现线程的状态一直是RUNNABLE,但是消息又一直没有进行确认,好几个小时了,一直是这样子,日志也没有见抛出什么异常.

然后,我到 OkHttp Github 提交了个issue,不过作者说那里只是一个bug以及feature的提交点,并不是作为question,建议我到stackoverflow.com提问下.

然后我就没有然后了。不过作者回复的挺快.

研究源码

查看com.squareup.okhttp.OkHttpClient的源码. 可以看到有6个(包括get,set)与timeout相关的设置.他们的单位都是毫秒.

setConnectTimeout
getConnectTimeout

setReadTimeout
getReadTimeout

setWriteTimeout
getWriteTimeout

然后,打印其默认值:

    private static final OkHttpClient ok = new OkHttpClient();

    @Test
    public void okinof() {
        System.out.println("connect timeout ->" + ok.getConnectTimeout());
        System.out.println("read timeout ->" + ok.getReadTimeout());
        System.out.println("write timeout ->" + ok.getWriteTimeout());
    }

结果如下:

connect timeout ->0
read timeout ->0
write timeout ->0

再看其源码的Java Doc说明:

  /**
   * Sets the default write timeout for new connections. A value of 0 means no timeout.
   */

如果值为0,就意味着没有超时时间,就一直等待.而默认值就是0.

因为在看到jstack时,它的线程状态为RUNABLE,而不是WAITING,所以排除死锁的情况.而且它一直是停留在

 private static Source source(final InputStream in, final Timeout timeout) {
    if (in == null) throw new IllegalArgumentException("in == null");
    if (timeout == null) throw new IllegalArgumentException("timeout == null");

    return new Source() {
      @Override public long read(Buffer sink, long byteCount) throws IOException {
        if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
        timeout.throwIfReached();
        Segment tail = sink.writableSegment(1);
        int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit);
        int bytesRead = in.read(tail.data, tail.limit, maxToCopy); //停留在这里
        if (bytesRead == -1) return -1;
        tail.limit += bytesRead;
        sink.size += bytesRead;
        return bytesRead;
      }

      @Override public void close() throws IOException {
        in.close();
      }

      @Override public Timeout timeout() {
        return timeout;
      }

      @Override public String toString() {
        return "source(" + in + ")";
      }
    };
  }

所以,它应该是在读取数据流的时候,一直在等待,又因为没有设置超时时间,所以是无限等待.

解决

    private final static OkHttpClient client = new OkHttpClient();

    static {
        //设置超时时时间,因为默认情况下是无限.
        client.setReadTimeout(5, TimeUnit.MINUTES);
        client.setWriteTimeout(5, TimeUnit.MINUTES);
        client.setConnectTimeout(1, TimeUnit.MINUTES);
    }

超时时间,要根据具体的逻辑情况来决定。

参考资料

  1. Okhttp Set timeout

  2. Okhttp connection

  3. OkhttpClient

  4. set connection timeout with okhttp